Prechádzať zdrojové kódy

changed QSA - prepared for internal subscription parameters and reduced memory allocations

Vaclav Kubart 19 rokov pred
rodič
commit
80ec566662

+ 4 - 0
lib/presence/ChangeLog

@@ -1,3 +1,7 @@
+2006-04-11
+	* reduced memory allocations in QSA
+	* prepared for parameters to QSA subscriptions
+
 2006-04-10
 2006-04-10
 	* changed QSA - corrected content-type propagation
 	* changed QSA - corrected content-type propagation
 
 

+ 16 - 4
lib/presence/notifier.h

@@ -51,13 +51,13 @@ void unregister_notifier(notifier_domain_t *domain, notifier_t *info);
  *
  *
  * Note: only for asynchonously processed subscriptions (synchronous 
  * Note: only for asynchonously processed subscriptions (synchronous 
  * don't need it) */
  * don't need it) */
-void accept_subscription(subscription_t *s);
+void accept_subscription(qsa_subscription_t *s);
 
 
 /** releases accepted subscription - MUST be called on all accepted 
 /** releases accepted subscription - MUST be called on all accepted 
  * subscriptions (only on them!) to be freed from memory !
  * subscriptions (only on them!) to be freed from memory !
  * Note: only for asynchonously processed subscriptions (synchronous 
  * Note: only for asynchonously processed subscriptions (synchronous 
  * don't need it) */
  * don't need it) */
-void release_subscription(subscription_t *s);
+void release_subscription(qsa_subscription_t *s);
 
 
 /** This structure is sent via message queue
 /** This structure is sent via message queue
  * to client. It must contain all information
  * to client. It must contain all information
@@ -72,7 +72,7 @@ typedef enum {
 
 
 typedef struct {
 typedef struct {
 	/* replacement for record_id, package, ... it is much more efficient */
 	/* replacement for record_id, package, ... it is much more efficient */
-	subscription_t *subscription; 
+	qsa_subscription_t *subscription; 
 	qsa_content_type_t *content_type;
 	qsa_content_type_t *content_type;
 	void *data;
 	void *data;
 	int data_len;
 	int data_len;
@@ -83,12 +83,24 @@ typedef struct {
 void free_client_notify_info_content(client_notify_info_t *info);
 void free_client_notify_info_content(client_notify_info_t *info);
 
 
 /* notifications SHOULD be sent through this method */
 /* notifications SHOULD be sent through this method */
-int notify_subscriber(subscription_t *s, 
+int notify_subscriber(qsa_subscription_t *s, 
 		notifier_t *n,
 		notifier_t *n,
 		qsa_content_type_t *content_type, 
 		qsa_content_type_t *content_type, 
 		void *data, 
 		void *data, 
 		qsa_subscription_status_t status);
 		qsa_subscription_status_t status);
 
 
+/* this can be called in notifier and the returned value is valid
+ * before finishes "unsubscribe" processing */
+str_t *get_subscriber_id(qsa_subscription_t *s);
+
+/* this can be called in notifier and the returned value is valid
+ * before finishes "unsubscribe" processing */
+str_t *get_record_id(qsa_subscription_t *s);
+
+/* this can be called in notifier and the returned value is valid
+ * before finishes "unsubscribe" processing */
+void *get_subscriber_data(qsa_subscription_t *s);
+
 #ifdef __cplusplus
 #ifdef __cplusplus
 }
 }
 #endif
 #endif

+ 65 - 43
lib/presence/notifier_domain.c

@@ -36,19 +36,19 @@
 /*#define lock_subscription_data(s) if (s->mutex) cds_mutex_lock(s->mutex);
 /*#define lock_subscription_data(s) if (s->mutex) cds_mutex_lock(s->mutex);
 #define unlock_subscription_data(s) if (s->mutex) cds_mutex_unlock(s->mutex);*/
 #define unlock_subscription_data(s) if (s->mutex) cds_mutex_unlock(s->mutex);*/
 
 
-static void lock_subscription_data(subscription_t *s)
+static void lock_subscription_data(qsa_subscription_t *s)
 {
 {
 	/* is function due to debugging */
 	/* is function due to debugging */
 	if (s->mutex) cds_mutex_lock(s->mutex);
 	if (s->mutex) cds_mutex_lock(s->mutex);
 }
 }
 
 
-static void unlock_subscription_data(subscription_t *s) {
+static void unlock_subscription_data(qsa_subscription_t *s) {
 	/* is function due to debugging */
 	/* is function due to debugging */
 	if (s->mutex) cds_mutex_unlock(s->mutex);
 	if (s->mutex) cds_mutex_unlock(s->mutex);
 }
 }
 
 
 static void free_notifier(notifier_t *info);
 static void free_notifier(notifier_t *info);
-static void free_subscription(subscription_t *s);
+static void free_subscription(qsa_subscription_t *s);
 
 
 /* -------- package functions -------- */
 /* -------- package functions -------- */
 
 
@@ -109,7 +109,7 @@ static notifier_package_t *get_package(notifier_domain_t *d, const str_t *name)
 static void destroy_package(notifier_package_t *p) 
 static void destroy_package(notifier_package_t *p) 
 {
 {
 	/* notifier_t *e, *n; */
 	/* notifier_t *e, *n; */
-	subscription_t *s, *ns;
+	qsa_subscription_t *s, *ns;
 	
 	
 	/* release all subscriptions ???  */
 	/* release all subscriptions ???  */
 	s = p->first_subscription;
 	s = p->first_subscription;
@@ -203,15 +203,13 @@ static void free_notifier(notifier_t *info)
 	cds_free(info);
 	cds_free(info);
 }
 }
 
 
-static void free_subscription(subscription_t *s)
+static void free_subscription(qsa_subscription_t *s)
 {
 {
-	DEBUG_LOG("freeing subscription to %.*s\n", FMT_STR(s->record_id));
-	str_free_content(&s->record_id);
-	str_free_content(&s->subscriber_id);
+	DEBUG_LOG("freeing subscription to %p\n", s);
 	cds_free(s);
 	cds_free(s);
 }
 }
 
 
-/*static void add_server_subscription(notifier_t *n, subscription_t *s)
+/*static void add_server_subscription(notifier_t *n, qsa_subscription_t *s)
 {
 {
 	server_subscription_t server_s;
 	server_subscription_t server_s;
 	
 	
@@ -223,7 +221,7 @@ static void free_subscription(subscription_t *s)
 	else ERROR_LOG("subscription not accepted by notifier %p\n", n);
 	else ERROR_LOG("subscription not accepted by notifier %p\n", n);
 }
 }
 			
 			
-static void remove_notifier_from_subscription(subscription_t *s, notifier_t *n)
+static void remove_notifier_from_subscription(qsa_subscription_t *s, notifier_t *n)
 {
 {
 	int cnt,i;
 	int cnt,i;
 
 
@@ -314,7 +312,7 @@ notifier_t *register_notifier(
 {
 {
 	notifier_t *info;
 	notifier_t *info;
 	notifier_package_t *p;
 	notifier_package_t *p;
-	subscription_t *s;
+	qsa_subscription_t *s;
 
 
 	lock_notifier_domain(domain);
 	lock_notifier_domain(domain);
 	p = get_package(domain, package);
 	p = get_package(domain, package);
@@ -362,7 +360,7 @@ void unregister_notifier(notifier_domain_t *domain, notifier_t *info)
 		/* accepted subscriptions MUST be removed by the notifier 
 		/* accepted subscriptions MUST be removed by the notifier 
 		 * how to solve this ? */
 		 * how to solve this ? */
 		
 		
-		/* subscription_t *s;
+		/* qsa_subscription_t *s;
 		s = p->first_subscription;
 		s = p->first_subscription;
 		while (s) {
 		while (s) {
 			CAN NOT be called !!!!! info->unsubscribe(info, s);
 			CAN NOT be called !!!!! info->unsubscribe(info, s);
@@ -380,28 +378,24 @@ void unregister_notifier(notifier_domain_t *domain, notifier_t *info)
 
 
 /* If a notifier publishing watched state registeres after subscibe
 /* If a notifier publishing watched state registeres after subscibe
  * call, it receives the subscription automaticaly too! */
  * call, it receives the subscription automaticaly too! */
-subscription_t *subscribe(notifier_domain_t *domain, 
+qsa_subscription_t *subscribe(notifier_domain_t *domain, 
 		str_t *package,
 		str_t *package,
-		str_t *record_id,
-		str_t *subscriber_id,
-		msg_queue_t *dst,
-		void *subscriber_data)
+		qsa_subscription_data_t *data)
 {
 {
-	subscription_t *s;
+	qsa_subscription_t *s;
 	notifier_t *e;
 	notifier_t *e;
 	notifier_package_t *p;
 	notifier_package_t *p;
 	int cnt = 0;
 	int cnt = 0;
-	int res;
 
 
 	lock_notifier_domain(domain);
 	lock_notifier_domain(domain);
 	p = get_package(domain, package);
 	p = get_package(domain, package);
 	if (!p) {
 	if (!p) {
-		ERROR_LOG("can't find package for subscription\n");
+		ERROR_LOG("can't find/add package for subscription\n");
 		unlock_notifier_domain(domain);
 		unlock_notifier_domain(domain);
 		return NULL;
 		return NULL;
 	}
 	}
 	
 	
-	s = cds_malloc(sizeof(subscription_t));
+	s = cds_malloc(sizeof(qsa_subscription_t));
 	if (!s) {
 	if (!s) {
 		ERROR_LOG("can't allocate memory\n");
 		ERROR_LOG("can't allocate memory\n");
 		unlock_notifier_domain(domain);
 		unlock_notifier_domain(domain);
@@ -409,20 +403,9 @@ subscription_t *subscribe(notifier_domain_t *domain,
 	}
 	}
 
 
 	s->package = p;
 	s->package = p;
-	s->dst = dst;
 	s->mutex = &domain->data_mutex;
 	s->mutex = &domain->data_mutex;
-	s->subscriber_data = subscriber_data;
-	res = str_dup(&s->record_id, record_id);
-	if (res == 0) res = str_dup(&s->subscriber_id, subscriber_id);
-	else str_clear(&s->subscriber_id);
-	if (res != 0) {
-		str_free_content(&s->record_id);
-		str_free_content(&s->subscriber_id);
-		cds_free(s);
-		ERROR_LOG("can't allocate memory\n");
-		unlock_notifier_domain(domain);
-		return NULL;
-	}
+	s->data = data;
+	s->allow_notifications = 1;
 	init_reference_counter(&s->ref);
 	init_reference_counter(&s->ref);
 
 
 	DOUBLE_LINKED_LIST_ADD(p->first_subscription, p->last_subscription, s);
 	DOUBLE_LINKED_LIST_ADD(p->first_subscription, p->last_subscription, s);
@@ -447,27 +430,27 @@ subscription_t *subscribe(notifier_domain_t *domain,
 	return s;
 	return s;
 }
 }
 	
 	
-void release_subscription(subscription_t *s)
+void release_subscription(qsa_subscription_t *s)
 {
 {
 	if (!s) return;
 	if (!s) return;
 	if (remove_reference(&s->ref)) free_subscription(s);
 	if (remove_reference(&s->ref)) free_subscription(s);
 }
 }
 
 
-void accept_subscription(subscription_t *s)
+void accept_subscription(qsa_subscription_t *s)
 {
 {
 	if (!s) return;
 	if (!s) return;
 	add_reference(&s->ref);
 	add_reference(&s->ref);
 }
 }
 
 
 /** Destroys an existing subscription - can be called ONLY by client !!! */
 /** Destroys an existing subscription - can be called ONLY by client !!! */
-void unsubscribe(notifier_domain_t *domain, subscription_t *s)
+void unsubscribe(notifier_domain_t *domain, qsa_subscription_t *s)
 {
 {
 	notifier_package_t *p;
 	notifier_package_t *p;
 	notifier_t *e;
 	notifier_t *e;
 
 
 	/* mark subscription as un-notifyable */
 	/* mark subscription as un-notifyable */
 	lock_subscription_data(s);
 	lock_subscription_data(s);
-	s->dst = NULL;
+	s->allow_notifications = 0;
 	unlock_subscription_data(s);
 	unlock_subscription_data(s);
 
 
 	lock_notifier_domain(domain);
 	lock_notifier_domain(domain);
@@ -489,14 +472,19 @@ void unsubscribe(notifier_domain_t *domain, subscription_t *s)
 	
 	
 	unlock_notifier_domain(domain);
 	unlock_notifier_domain(domain);
 	
 	
+	/* mark subscription data as invalid */
+	lock_subscription_data(s);
+	s->data = NULL;
+	unlock_subscription_data(s);
+	
 	/* remove clients reference (dont give references to client?) */
 	/* remove clients reference (dont give references to client?) */
 	remove_reference(&s->ref);
 	remove_reference(&s->ref);
 	
 	
 	release_subscription(s); 
 	release_subscription(s); 
 }
 }
 
 
-/* void notify_subscriber(subscription_t *s, mq_message_t *msg) */
-int notify_subscriber(subscription_t *s, 
+/* void notify_subscriber(qsa_subscription_t *s, mq_message_t *msg) */
+int notify_subscriber(qsa_subscription_t *s, 
 		notifier_t *n,
 		notifier_t *n,
 		qsa_content_type_t *content_type, 
 		qsa_content_type_t *content_type, 
 		void *data, 
 		void *data, 
@@ -535,9 +523,11 @@ int notify_subscriber(subscription_t *s,
 		info->status = status;
 		info->status = status;
 		
 		
 		lock_subscription_data(s);
 		lock_subscription_data(s);
-		if (s->dst) {
-			if (push_message(s->dst, msg) < 0) ok = 0;
-			else sent = 1;
+		if ((s->allow_notifications) && (s->data)) {
+			if (s->data->dst) {
+				if (push_message(s->data->dst, msg) < 0) ok = 0;
+				else sent = 1;
+			}
 		}
 		}
 		unlock_subscription_data(s);
 		unlock_subscription_data(s);
 	}
 	}
@@ -565,3 +555,35 @@ void free_client_notify_info_content(client_notify_info_t *info)
 	else ERR("BUG: content-type not given! Possible memory leaks!\n");
 	else ERR("BUG: content-type not given! Possible memory leaks!\n");
 }
 }
 
 
+/* this can be called in notifier and the returned value is valid
+ * before finishes "unsubscribe" processing */
+str_t *get_subscriber_id(qsa_subscription_t *s)
+{
+	if (!s) return NULL;
+	if (!s->data) return NULL;
+	return &s->data->subscriber_id;
+}
+
+/* this can be called in notifier and the returned value is valid
+ * before finishes "unsubscribe" processing */
+str_t *get_record_id(qsa_subscription_t *s)
+{
+	if (!s) return NULL;
+	if (!s->data) return NULL;
+	return &s->data->record_id;
+}
+
+/* this can be called in notifier and the returned value is valid
+ * before finishes "unsubscribe" processing */
+void *get_subscriber_data(qsa_subscription_t *s)
+{
+	if (!s) return NULL;
+	if (!s->data) return NULL;
+	return s->data->subscriber_data;
+}
+
+void clear_subscription_data(qsa_subscription_data_t *data)
+{
+	if (data) memset(data, 0, sizeof(*data));
+}
+

+ 23 - 14
lib/presence/notifier_domain.h

@@ -29,15 +29,15 @@
 #include <cds/sstr.h>
 #include <cds/sstr.h>
 #include <cds/ptr_vector.h>
 #include <cds/ptr_vector.h>
 #include <cds/sync.h>
 #include <cds/sync.h>
-#include <cds/msg_queue.h>
-#include <cds/ref_cntr.h>
+
+#include <presence/qsa_params.h>
 
 
 #ifdef __cplusplus
 #ifdef __cplusplus
 extern "C" {
 extern "C" {
 #endif
 #endif
 
 
-struct _subscription_t;
-typedef struct _subscription_t subscription_t;
+struct _qsa_subscription_t;
+typedef struct _qsa_subscription_t qsa_subscription_t;
 struct _notifier_package_t;
 struct _notifier_package_t;
 typedef struct _notifier_package_t notifier_package_t;
 typedef struct _notifier_package_t notifier_package_t;
 struct _notifier_t;
 struct _notifier_t;
@@ -45,26 +45,35 @@ typedef struct _notifier_t notifier_t;
 struct _notifier_domain_t;
 struct _notifier_domain_t;
 typedef struct _notifier_domain_t notifier_domain_t;
 typedef struct _notifier_domain_t notifier_domain_t;
 
 
-/* typedef void (*client_notify_func)(client_notify_info_t *info); */
+/* data hold by subscriber for the time of subscription duration 
+ * (from subscribe to unsubscribe; after calling unsubscribe can
+ * be destroyed contents of them) */
+typedef struct _qsa_subscription_data_t {
+	msg_queue_t *dst;
+	str_t record_id;
+	str_t subscriber_id;
+	qsa_subscription_params_t *first_param;
+	void *subscriber_data;
+} qsa_subscription_data_t;
 
 
 /** Internal structure holding informations about
 /** Internal structure holding informations about
  * created client subscriptions.
  * created client subscriptions.
  */
  */
-struct _subscription_t {
+struct _qsa_subscription_t {
 	/* client_notify_func notify; */
 	/* client_notify_func notify; */
 	cds_mutex_t *mutex;
 	cds_mutex_t *mutex;
-	msg_queue_t *dst;
-	str_t record_id;
-	str_t subscriber_id;
 	notifier_package_t *package;
 	notifier_package_t *package;
-	void *subscriber_data;
-	struct _subscription_t *prev, *next;
+	int allow_notifications;
+	qsa_subscription_data_t *data;
+	struct _qsa_subscription_t *prev, *next;
 	reference_counter_data_t ref;
 	reference_counter_data_t ref;
 };
 };
 
 
-typedef int (*server_subscribe_func)(notifier_t *n, subscription_t *subscription);
+/* typedef void (*client_notify_func)(client_notify_info_t *info); */
+
+typedef int (*server_subscribe_func)(notifier_t *n, qsa_subscription_t *subscription);
 
 
-typedef void (*server_unsubscribe_func)(notifier_t *n, subscription_t *subscription);
+typedef void (*server_unsubscribe_func)(notifier_t *n, qsa_subscription_t *subscription);
 
 
 typedef struct _qsa_content_type_t {
 typedef struct _qsa_content_type_t {
 	struct _qsa_content_type_t *next, *prev;
 	struct _qsa_content_type_t *next, *prev;
@@ -87,7 +96,7 @@ struct _notifier_package_t {
 	/* maybe: serialize and deserialize methods */
 	/* maybe: serialize and deserialize methods */
 	notifier_domain_t *domain;
 	notifier_domain_t *domain;
 	notifier_t *first_notifier, *last_notifier; /* notifiers are linked in theirs package! */
 	notifier_t *first_notifier, *last_notifier; /* notifiers are linked in theirs package! */
-	subscription_t *first_subscription, *last_subscription;
+	qsa_subscription_t *first_subscription, *last_subscription;
 	notifier_package_t *next, *prev;
 	notifier_package_t *next, *prev;
 };
 };
 
 

+ 2 - 0
lib/presence/qsa_params.c

@@ -0,0 +1,2 @@
+#include <presence/qsa_params.h>
+

+ 23 - 0
lib/presence/qsa_params.h

@@ -0,0 +1,23 @@
+#ifndef __QSA_PARAMS_H
+#define __QSA_PARAMS_H
+
+#include <cds/sstr.h>
+#include <cds/ptr_vector.h>
+#include <cds/msg_queue.h>
+#include <cds/ref_cntr.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+	
+typedef struct _qsa_subscription_params_t {
+	str_t name;
+	str_t value; /* whatever */
+	char buf[1];
+} qsa_subscription_params_t;
+
+#ifdef __cplusplus
+}
+#endif
+	
+#endif

+ 9 - 10
lib/presence/subscriber.h

@@ -36,20 +36,19 @@ extern "C" {
 
 
 /* If a notifier publishing watched state registeres after subscibe
 /* If a notifier publishing watched state registeres after subscibe
  * call, it receives the subscription automaticaly too! */
  * call, it receives the subscription automaticaly too! */
-/*subscription_t *subscribe(notifier_domain_t *domain, 
-		subscription_t *params);*/
-subscription_t *subscribe(notifier_domain_t *domain, 
+/*qsa_subscription_t *subscribe(notifier_domain_t *domain, 
+		qsa_subscription_t *params);*/
+qsa_subscription_t *subscribe(notifier_domain_t *domain, 
 		str_t *package,
 		str_t *package,
-		str_t *record_id,
-		str_t *subscriber_id,
-		msg_queue_t *dst,
-		void *subscriber_data);
+		qsa_subscription_data_t *data);
 
 
 /** Destroys an existing subscription */
 /** Destroys an existing subscription */
-void unsubscribe(notifier_domain_t *domain, subscription_t *s);
+void unsubscribe(notifier_domain_t *domain, qsa_subscription_t *s);
 
 
-void set_subscriber_data(subscription_t *s, void *data);
-void *get_subscriber_data(subscription_t *s);
+void set_subscriber_data(qsa_subscription_t *s, void *data);
+void *get_subscriber_data(qsa_subscription_t *s);
+
+void clear_subscription_data(qsa_subscription_data_t *data);
 	
 	
 #ifdef __cplusplus
 #ifdef __cplusplus
 }
 }