2
0
Эх сурвалжийг харах

topos: load and pack db branch data

Daniel-Constantin Mierla 9 жил өмнө
parent
commit
a741f797a5

+ 0 - 6
modules/topos/topos_mod.c

@@ -292,12 +292,6 @@ int tps_msg_received(void *data)
 		}
 	} else {
 		/* reply */
-		if(msg.via2==0) {
-			/* one Via in received reply -- it is for local generated request
-			 * - nothing to unhide unless is CANCEL/ACK */
-			if(!((get_cseq(&msg)->method_id)&(METHOD_CANCEL)))
-				goto done;
-		}
 		tps_response_received(&msg);
 	}
 

+ 39 - 20
modules/topos/tps_msg.c

@@ -295,26 +295,6 @@ int tps_skip_msg(sip_msg_t *msg)
 	return 0;
 }
 
-/**
- *
- */
-int tps_request_received(sip_msg_t *msg, int dialog, int direction)
-{
-	if(dialog==0) {
-		/* nothing to do for initial request */
-		return 0;
-	}
-	return 0;
-}
-
-/**
- *
- */
-int tps_response_received(sip_msg_t *msg)
-{
-	return 0;
-}
-
 /**
  *
  */
@@ -456,6 +436,45 @@ int tps_reinsert_contact(sip_msg_t *msg, tps_data_t *ptsd, str *hbody)
 	return 0;
 }
 
+/**
+ *
+ */
+int tps_request_received(sip_msg_t *msg, int dialog, int direction)
+{
+	if(dialog==0) {
+		/* nothing to do for initial request */
+		return 0;
+	}
+	return 0;
+}
+
+/**
+ *
+ */
+int tps_response_received(sip_msg_t *msg)
+{
+	tps_data_t mtsd;
+	tps_data_t stsd;
+	tps_data_t *ptsd;
+	str lkey;
+
+	memset(&mtsd, 0, sizeof(tps_data_t));
+	memset(&stsd, 0, sizeof(tps_data_t));
+	ptsd = &mtsd;
+
+	if(tps_pack_request(msg, &mtsd)<0) {
+		LM_ERR("failed to extract and pack the headers\n");
+		return -1;
+	}
+	if(tps_storage_load_dialog(msg, &mtsd, &stsd)<0) {
+		goto error;
+	}
+
+	return 0;
+
+error:
+	return -1;
+}
 
 /**
  *

+ 131 - 1
modules/topos/tps_storage.c

@@ -354,8 +354,10 @@ str tt_col_a_uuid = str_init("a_uuid");
 str tt_col_b_uuid = str_init("b_uuid");
 str tt_col_direction = str_init("direction");
 str tt_col_x_via = str_init("x_via");
-str tt_col_x_tag = str_init("x_tag");
 str tt_col_x_vbranch = str_init("x_vbranch");
+str tt_col_x_rr = str_init("x_rr");
+str tt_col_x_uri = str_init("x_uri");
+str tt_col_x_tag = str_init("x_tag");
 
 #define TPS_NR_KEYS	32
 
@@ -613,7 +615,135 @@ int tps_db_clean_branches(void)
 	return 0;
 }
 
+#define TPS_DATA_APPEND_DB(_sd, _res, _c, _s)	\
+	do { \
+		if (RES_ROWS(_res)[0].values[_c].nul == 0) \
+		{ \
+			str tmp; \
+			switch(RES_ROWS(_res)[0].values[_c].type) \
+			{ \
+			case DB1_STRING: \
+				tmp.s=(char*)RES_ROWS(_res)[0].values[_c].val.string_val; \
+				tmp.len=strlen(tmp.s); \
+				break; \
+			case DB1_STR: \
+				tmp.len=RES_ROWS(_res)[0].values[_c].val.str_val.len; \
+				tmp.s=(char*)RES_ROWS(_res)[0].values[_c].val.str_val.s; \
+				break; \
+			case DB1_BLOB: \
+				tmp.len=RES_ROWS(_res)[0].values[_c].val.blob_val.len; \
+				tmp.s=(char*)RES_ROWS(_res)[0].values[_c].val.blob_val.s; \
+				break; \
+			default: \
+				tmp.len=0; \
+				tmp.s=NULL; \
+			} \
+			if((_sd)->cp + tmp.len >= (_sd)->cbuf + TPS_DATA_SIZE) { \
+				LM_ERR("not enough space for %d\n", _c); \
+				goto error; \
+			} \
+			if(tmp.len>0) { \
+				(_s)->s = (_sd)->cp; \
+				(_s)->len = tmp.len; \
+				memcpy((_sd)->cp, tmp.s, tmp.len); \
+				(_sd)->cp += tmp.len; \
+				(_sd)->cp[0] = '\0'; \
+				(_sd)->cp++; \
+			} \
+		} \
+	} while(0);
+/**
+ *
+ */
+int tps_storage_load_branch(sip_msg_t *msg, tps_data_t *md, tps_data_t *sd)
+{
+	db_key_t db_keys[4];
+	db_op_t  db_ops[4];
+	db_val_t db_vals[4];
+	db_key_t db_cols[TPS_NR_KEYS];
+	db1_res_t* db_res = NULL;
+	int nr_keys;
+	int nr_cols;
+	int n;
+
+	if(msg==NULL || md==NULL || sd==NULL || _tps_db_handle==NULL)
+		return -1;
+
+	nr_keys = 0;
+	nr_cols = 0;
+
+	db_keys[nr_keys]=&tt_col_x_vbranch;
+	db_ops[nr_keys]=OP_EQ;
+	db_vals[nr_keys].type = DB1_STR;
+	db_vals[nr_keys].nul = 0;
+	db_vals[nr_keys].val.str_val = TPS_STRZ(md->x_vbranch1);
+	nr_keys++;
+
+	db_cols[nr_cols++] = &tt_col_rectime;
+	db_cols[nr_cols++] = &tt_col_a_callid;
+	db_cols[nr_cols++] = &tt_col_a_uuid;
+	db_cols[nr_cols++] = &tt_col_b_uuid;
+	db_cols[nr_cols++] = &tt_col_direction;
+	db_cols[nr_cols++] = &tt_col_x_via;
+	db_cols[nr_cols++] = &tt_col_x_vbranch;
+	db_cols[nr_cols++] = &tt_col_x_rr;
+	db_cols[nr_cols++] = &tt_col_x_uri;
+	db_cols[nr_cols++] = &tt_col_x_tag;
+
+	if (_tpsdbf.use_table(_tps_db_handle, &tt_table_name) < 0) {
+		LM_ERR("failed to perform use table\n");
+		return -1;
+	}
+
+	if (_tpsdbf.query(_tps_db_handle, db_keys, db_ops, db_vals, db_cols,
+				nr_keys, nr_cols, NULL, &db_res) < 0) {
+		LM_ERR("failed to query database\n");
+		goto error;
+	}
 
+	if (RES_ROW_N(db_res) <= 0) {
+		LM_DBG("no stored record for <%.*s>\n",
+				md->x_vbranch1.len, ZSW(md->x_vbranch1.s));
+		return 1;
+	}
+
+	sd->cp = sd->cbuf;
+
+	n = 0;
+	n++; /*rectime*/
+	TPS_DATA_APPEND_DB(sd, db_res, n++, &sd->a_callid);
+	TPS_DATA_APPEND_DB(sd, db_res, n++, &sd->a_uuid);
+	TPS_DATA_APPEND_DB(sd, db_res, n++, &sd->b_uuid);
+	n++; /*direction*/
+	TPS_DATA_APPEND_DB(sd, db_res, n++, &sd->x_via);
+	TPS_DATA_APPEND_DB(sd, db_res, n++, &sd->x_vbranch1);
+	TPS_DATA_APPEND_DB(sd, db_res, n++, &sd->x_rr);
+	TPS_DATA_APPEND_DB(sd, db_res, n++, &sd->x_uri);
+	TPS_DATA_APPEND_DB(sd, db_res, n++, &sd->x_tag);
+
+	if ((db_res !=NULL) && _tpsdbf.free_result(_tps_db_handle, db_res) < 0)
+		LM_ERR("failed to free result of query\n");
+
+	return 0;
+
+error:
+	if ((db_res !=NULL) && _tpsdbf.free_result(_tps_db_handle, db_res) < 0)
+		LM_ERR("failed to free result of query\n");
+
+	return -1;
+}
+
+/**
+ *
+ */
+int tps_storage_load_dialog(sip_msg_t *msg, tps_data_t *md, tps_data_t *sd)
+{
+	return 0;
+}
+
+/**
+ *
+ */
 void tps_storage_clean(unsigned int ticks, void* param)
 {
 	tps_db_clean_branches();

+ 5 - 1
modules/topos/tps_storage.h

@@ -34,7 +34,7 @@
 #define TPS_DIR_DOWNSTREAM	0
 #define TPS_DIR_UPSTREAM	1
 
-#define TPS_DATA_SIZE	4096
+#define TPS_DATA_SIZE	8192
 typedef struct tps_data {
 	char cbuf[TPS_DATA_SIZE];
 	char *cp;
@@ -59,6 +59,8 @@ typedef struct tps_data {
 	str x_vbranch1;
 	str x_via;
 	str x_tag;
+	str x_rr;
+	str x_uri;
 	int32_t iflags;
 	int32_t direction;
 } tps_data_t;
@@ -72,6 +74,8 @@ int tps_storage_branch_save(sip_msg_t *msg, tps_data_t *td);
 int tps_storage_branch_rm(sip_msg_t *msg, tps_data_t *td);
 
 int tps_storage_record(sip_msg_t *msg, tps_data_t *td);
+int tps_storage_load_branch(sip_msg_t *msg, tps_data_t *md, tps_data_t *sd);
+int tps_storage_load_dialog(sip_msg_t *msg, tps_data_t *md, tps_data_t *sd);
 
 int tps_storage_lock_set_init(void);
 int tps_storage_lock_get(str *lkey);