소스 검색

- improved cleanup of pa, rls and libs
- corrected error in watcherinfo authorization

Vaclav Kubart 20 년 전
부모
커밋
b4a18a2989
13개의 변경된 파일388개의 추가작업 그리고 33개의 파일을 삭제
  1. 2 0
      lib/Makefile
  2. 57 0
      lib/cds/cds.c
  3. 17 0
      lib/cds/cds.h
  4. 1 1
      lib/cds/logger.h
  5. 9 0
      lib/cds/msg_queue.c
  6. 13 0
      lib/cds/msg_queue.h
  7. 75 0
      lib/cds/ref_cntr.c
  8. 42 0
      lib/cds/ref_cntr.h
  9. 37 4
      lib/presence/domain_maintainer.c
  10. 10 0
      lib/presence/notifier.h
  11. 72 12
      lib/presence/notifier_domain.c
  12. 5 0
      lib/presence/notifier_domain.h
  13. 48 16
      lib/presence/qsa.c

+ 2 - 0
lib/Makefile

@@ -2,11 +2,13 @@
 
 INCLUDES += -I$(CURDIR)
 LIBS     += -L$(CURDIR)/cds -L$(CURDIR)/qsa
+DEFS     += -Wall
 
 ####################################
 # make rules
 
 export LIBS
+export DEFS
 export INCLUDES
 
 SUBDIRS=cds xcap presence

+ 57 - 0
lib/cds/cds.c

@@ -0,0 +1,57 @@
+#include <cds/cds.h>
+#include <cds/memory.h>
+#include <cds/sync.h>
+#include <cds/logger.h>
+
+typedef struct {
+	int init_cnt;
+} init_data_t;
+
+static init_data_t *init = NULL;
+
+/* these functions are internal and thus are not presented in headers !*/
+int reference_counter_initialize();
+void reference_counter_cleanup();
+	
+int cds_initialize()
+{
+	int res = 0;
+
+	/* initialization should be called from one process/thread 
+	 * it is not synchronized because it is impossible ! */
+	if (!init) {
+		init = (init_data_t*)cds_malloc(sizeof(init_data_t));
+		if (!init) return -1;
+		init->init_cnt = 0;
+	}
+
+	if (init->init_cnt > 0) { /* already initialized */
+		init->init_cnt++;
+		return 0;
+	}
+	else {
+		DEBUG_LOG("cds_initialize(): init the content\n");
+		
+		/* !!! put the real initialization here !!! */
+		res = reference_counter_initialize();
+	}
+			
+	if (res == 0) init->init_cnt++;
+	return res;
+}
+
+void cds_cleanup()
+{
+	if (init) {
+		if (--init->init_cnt == 0) {
+			DEBUG_LOG("cds_cleanup(): cleaning the content\n");
+			
+			/* !!! put the real destruction here !!! */
+			reference_counter_cleanup();
+		
+			cds_free(init);
+			init = NULL;
+		}
+	}
+}
+

+ 17 - 0
lib/cds/cds.h

@@ -0,0 +1,17 @@
+#ifndef __CDS_H
+#define __CDS_H
+
+/* declaration of initialization/destruction functions */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+	
+int cds_initialize();
+void cds_cleanup();
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif

+ 1 - 1
lib/cds/logger.h

@@ -42,7 +42,7 @@
 #include "dprint.h"
 
 #define ERROR_LOG(a,args...)		LOG(L_ERR,a,##args)
-#define DEBUG_LOG(a,args...)		LOG(L_DBG,a,##args)
+#define DEBUG_LOG(a,args...)		LOG(L_ERR,a,##args)
 #define TRACE_LOG(a,args...)		LOG(L_ERR,a,##args)
 #define WARN_LOG(a,args...)			LOG(L_WARN,a,##args)
 #define FLUSH_LOG()					do{}while(0)

+ 9 - 0
lib/cds/msg_queue.c

@@ -26,6 +26,7 @@
 #include <stdio.h>
 #include <cds/msg_queue.h>
 #include <cds/memory.h>
+#include <cds/ref_cntr.h>
 
 mq_message_t *create_message_ex(int data_len)
 {
@@ -155,6 +156,7 @@ int msg_queue_init(msg_queue_t *q)
 int msg_queue_init_ex(msg_queue_t *q, int synchronize)
 {
 	if (synchronize) cds_mutex_init(&q->q_mutex);
+	init_reference_counter(&q->ref);
 	q->use_mutex = synchronize;
 	q->first = NULL;
 	q->last = NULL;
@@ -181,4 +183,11 @@ void msg_queue_destroy(msg_queue_t *q)
 	}
 }
 
+void msg_queue_free(msg_queue_t *q)
+{
+	if (remove_reference(&q->ref)) {
+		msg_queue_destroy(q);
+		cds_free(q);
+	}
+}
 

+ 13 - 0
lib/cds/msg_queue.h

@@ -27,6 +27,11 @@
 #define __MSG_QUEUE_H
 
 #include <cds/sync.h>
+#include <cds/ref_cntr.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
 
 typedef void (*destroy_function_f)(void *);
 
@@ -43,6 +48,7 @@ typedef struct _mq_message_t {
 } mq_message_t;
 
 typedef struct msg_queue {
+	reference_counter_data_t ref;
 	mq_message_t *first;
 	mq_message_t *last;
 	cds_mutex_t q_mutex;
@@ -82,5 +88,12 @@ int msg_queue_init(msg_queue_t *q);
 int msg_queue_init_ex(msg_queue_t *q, int synchronize);
 void msg_queue_destroy(msg_queue_t *q);
 
+/* removes reference to message queue and frees it if no other references exist */
+void msg_queue_free(msg_queue_t *q);
+
+#ifdef __cplusplus
+}
+#endif
+
 #endif
 

+ 75 - 0
lib/cds/ref_cntr.c

@@ -0,0 +1,75 @@
+#include <cds/ref_cntr.h>
+#include <cds/logger.h>
+#include <cds/memory.h>
+
+/* One global mutex for reference counting may be enough. 
+ * If problems try to create pool of precreated mutexes
+ * and use them randomly.
+ */
+static cds_mutex_t *ref_cntr_mutex = NULL;
+
+/* global functions for initialization and destruction */
+
+int reference_counter_initialize()
+{
+	if (!ref_cntr_mutex) {
+		ref_cntr_mutex = (cds_mutex_t*)cds_malloc(sizeof(cds_mutex_t));
+		if (ref_cntr_mutex) {
+			cds_mutex_init(ref_cntr_mutex);
+			return 0;
+		}
+	}
+	return -1;
+}
+
+void reference_counter_cleanup()
+{
+	if (ref_cntr_mutex) {
+		cds_mutex_destroy(ref_cntr_mutex);
+		cds_free(ref_cntr_mutex);
+		ref_cntr_mutex = NULL;
+	}
+}
+
+/* -------------------------------------------------------------------- */
+
+void init_reference_counter(reference_counter_data_t *ref)
+{
+	if (ref) {
+		ref->cntr = 1;
+		ref->mutex = ref_cntr_mutex;
+	}
+}
+
+void add_reference(reference_counter_data_t *ref)
+{
+	if (ref) {
+		if (ref->mutex) cds_mutex_lock(ref->mutex);
+		ref->cntr++;
+		if (ref->mutex) cds_mutex_unlock(ref->mutex);
+	}
+}
+
+int get_reference_count(reference_counter_data_t *ref)
+{
+	int res = 0;
+	if (ref) {
+		if (ref->mutex) cds_mutex_lock(ref->mutex);
+		res = ref->cntr;
+		if (ref->mutex) cds_mutex_unlock(ref->mutex);
+	}
+	return res;
+}
+
+int remove_reference(reference_counter_data_t *ref)
+{
+	int res = 0;
+	if (ref) {
+		if (ref->mutex) cds_mutex_lock(ref->mutex);
+		if (ref->cntr > 0) ref->cntr--;
+		if (ref->cntr == 0) res = 1;
+		if (ref->mutex) cds_mutex_unlock(ref->mutex);
+	}
+	return res;
+}
+

+ 42 - 0
lib/cds/ref_cntr.h

@@ -0,0 +1,42 @@
+#ifndef __REFERENCE_CNTR_H
+#define __REFERENCE_CNTR_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <cds/sync.h>
+	
+typedef struct {
+	int cntr;
+	cds_mutex_t *mutex;
+} reference_counter_data_t;
+	
+/* these functions can be called only by owner of at least one reference */
+/* owner is somebody who:
+ *    - created a referenced structure
+ *    - added a reference
+ *    - got a reference by an ovner
+ */
+
+void init_reference_counter(reference_counter_data_t *ref);
+void add_reference(reference_counter_data_t *ref);
+int get_reference_count(reference_counter_data_t *ref);
+
+/* returns:
+ * 0 if reference removed, but exist other references
+ * 1 if removed last refernce and the element SHOULD be freed
+ *
+ * usage:
+ * 
+ * some_structure *ss;
+ * ...
+ * if (remove_reference(&ss->ref)) cds_free(&ss->ref);
+ *  */
+int remove_reference(reference_counter_data_t *ref);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif

+ 37 - 4
lib/presence/domain_maintainer.c

@@ -46,12 +46,17 @@ void destroy_domain_maintainer(domain_maintainer_t *dm)
 	notifier_domain_t *d;
 	
 	if (!dm) return;
+			
+	DEBUG_LOG("destroying domain maintainer\n");
 
 	cnt = ptr_vector_size(&dm->registered_domains);
 	for (i = 0; i < cnt; i++) {
 		d = ptr_vector_get(&dm->registered_domains, i);
 		if (!d) continue;
-		destroy_notifier_domain(d);
+		if (remove_reference(&d->ref)) {
+			DEBUG_LOG("freeing domain: \'%.*s\'\n", FMT_STR(d->name));
+			destroy_notifier_domain(d);
+		}
 	}
 	ptr_vector_destroy(&dm->registered_domains);
 	cds_mutex_destroy(&dm->mutex);
@@ -77,13 +82,14 @@ static notifier_domain_t *add_domain_nolock(domain_maintainer_t *dm, const str_t
 	notifier_domain_t *d = create_notifier_domain(name);
 	
 	if (d) {
+		DEBUG_LOG("created domain: \'%.*s\'\n", FMT_STR(d->name));
 		ptr_vector_add(&dm->registered_domains, d);
 		return d;
 	}
 	else return NULL;
 }
 
-notifier_domain_t *find_notifier_domain(domain_maintainer_t *dm, const str_t *name)
+/* notifier_domain_t *find_notifier_domain(domain_maintainer_t *dm, const str_t *name)
 {
 	notifier_domain_t *d = NULL;
 	
@@ -92,7 +98,7 @@ notifier_domain_t *find_notifier_domain(domain_maintainer_t *dm, const str_t *na
 	d = find_domain_nolock(dm, name);
 	cds_mutex_unlock(&dm->mutex);
 	return d;
-}
+} */
 
 notifier_domain_t *register_notifier_domain(domain_maintainer_t *dm, const str_t *name)
 {
@@ -103,12 +109,39 @@ notifier_domain_t *register_notifier_domain(domain_maintainer_t *dm, const str_t
 	cds_mutex_lock(&dm->mutex);
 	d = find_domain_nolock(dm, name);
 	if (!d) d = add_domain_nolock(dm, name);
+	if (d) {
+		add_reference(&d->ref); /* add reference for client */
+	}
 	cds_mutex_unlock(&dm->mutex);
 	return d;
 }
 
+static void remove_notifier_domain(domain_maintainer_t *dm, notifier_domain_t *domain)
+{
+	notifier_domain_t *d = NULL;
+	int i, cnt;
+	
+	cnt = ptr_vector_size(&dm->registered_domains);
+	for (i = 0; i < cnt; i++) {
+		d = ptr_vector_get(&dm->registered_domains, i);
+		if (d == domain) {
+			ptr_vector_remove(&dm->registered_domains, i);
+			break;
+		}
+	}
+}
+
 void release_notifier_domain(domain_maintainer_t *dm, notifier_domain_t *domain)
 {
-	/* TODO: decrement "domain counter", if 0 => destroy domain */
+	if ((!dm) || (!domain)) return;
+	
+	cds_mutex_lock(&dm->mutex);
+	if (remove_reference(&domain->ref)) {
+		/* last reference */
+		DEBUG_LOG("freeing domain: \'%.*s\'\n", FMT_STR(domain->name));
+		remove_notifier_domain(dm, domain);
+		destroy_notifier_domain(domain);
+	}
+	cds_mutex_unlock(&dm->mutex);
 }
 

+ 10 - 0
lib/presence/notifier.h

@@ -43,6 +43,16 @@ notifier_t *register_notifier(
 	void *user_data);
 
 void unregister_notifier(notifier_domain_t *domain, notifier_t *info);
+
+/** accepts subscription (internaly adds reference to it), thus it can 
+ * be handled by notifier which called this function 
+ * MUST be called in notifier's subscribe function, otherwise the 
+ * subscription can NOT be accepted */
+void accept_subscription(subscription_t *s);
+
+/** releases accepted subscription - MUST be called on all accepted 
+ * subscriptions (only on them!) to be freed from memory !*/
+void release_subscription(subscription_t *s);
 	
 #ifdef __cplusplus
 }

+ 72 - 12
lib/presence/notifier_domain.c

@@ -31,6 +31,10 @@
 #include <presence/notifier.h>
 #include <presence/subscriber.h>
 #include <cds/list.h>
+#include <cds/cds.h>
+
+#define lock_subscription_data(s) if (s->mutex) cds_mutex_lock(s->mutex);
+#define unlock_subscription_data(s) if (s->mutex) cds_mutex_unlock(s->mutex);
 
 static void free_notifier(notifier_t *info);
 static void free_subscription(subscription_t *s);
@@ -89,27 +93,27 @@ static notifier_package_t *get_package(notifier_domain_t *d, const str_t *name)
 	
 static void destroy_package(notifier_package_t *p) 
 {
-	notifier_t *e, *n;
+	/* notifier_t *e, *n; */
 	subscription_t *s, *ns;
 	
-	/* release all subscriptions  */
+	/* release all subscriptions ???  */
 	s = p->first_subscription;
 	while (s) {
 		ns = s->next;
-		/* unsubscribe(p->domain, s) */
-		/* release_subscription(s); */
-		free_subscription(s);
+		/* CAN NOT be called !!!! : unsubscribe(p->domain, s) */
+		release_subscription(s);
 		s = ns;
 	}
 	
-	/* release all registered notifiers */
-	e = p->first_notifier;
+	/* !!! don't release notifiers - its their job !!! */
+	/* it may lead to errors there */
+	/* e = p->first_notifier;
 	while (e) {
 		n = e->next;
 		free_notifier(e);
-		/* maybe: call some notifier callback ? */
 		e = n;
-	}
+	} */
+	
 	p->first_notifier = NULL;
 	p->last_notifier = NULL;
 	str_free_content(&p->name);
@@ -170,6 +174,8 @@ notifier_domain_t *create_notifier_domain(const str_t *name)
 		d->last_package = NULL;
 		str_dup(&d->name, name);
 		cds_mutex_init(&d->mutex);
+		cds_mutex_init(&d->data_mutex);
+		init_reference_counter(&d->ref);
 	}
 	return d;
 }
@@ -180,6 +186,10 @@ notifier_domain_t *create_notifier_domain(const str_t *name)
 void destroy_notifier_domain(notifier_domain_t *domain)
 {
 	notifier_package_t *p, *n;
+
+	/* this function is always called only if no only one reference
+	 * to domain exists (see domain maintainer), this should mean, that 
+	 * all subscribers freed their subscriptions */
 	
 	lock_notifier_domain(domain);
 	
@@ -197,6 +207,7 @@ void destroy_notifier_domain(notifier_domain_t *domain)
 	
 	str_free_content(&domain->name);
 	cds_mutex_destroy(&domain->mutex);
+	cds_mutex_init(&domain->data_mutex);
 	cds_free(domain);
 }
 
@@ -257,10 +268,13 @@ void unregister_notifier(notifier_domain_t *domain, notifier_t *info)
 	
 	p = info->package;
 	if (p) {
+		/* accepted subscriptions MUST be removed by the notifier 
+		 * how to solve this ? */
+		
 		/* subscription_t *s;
 		s = p->first_subscription;
 		while (s) {
-			info->unsubscribe(info, s);
+			CAN NOT be called !!!!! info->unsubscribe(info, s);
 			s = s->next;
 		}*/
 
@@ -303,18 +317,25 @@ subscription_t *subscribe(notifier_domain_t *domain,
 
 	s->package = p;
 	s->dst = dst;
+	s->mutex = &domain->data_mutex;
 	s->subscriber_data = subscriber_data;
 	str_dup(&s->record_id, record_id);
 	str_dup(&s->subscriber_id, subscriber_id);
+	init_reference_counter(&s->ref);
 
 	DOUBLE_LINKED_LIST_ADD(p->first_subscription, p->last_subscription, s);
 
+	/* add a reference for calling subscriber */
+	add_reference(&s->ref);
+	
 	/* browse all notifiers in given package and subscribe to them
 	 * and add them to notifiers list */
 	cnt = 0;
 	e = p->first_notifier;
 	while (e) {
 		cnt++;
+		/* each notifier MUST add its own reference if
+		 * it wants to accept the subscription !!! */
 		e->subscribe(e, s);
 		e = e->next;
 	}
@@ -323,13 +344,30 @@ subscription_t *subscribe(notifier_domain_t *domain,
 	
 	return s;
 }
+	
+void release_subscription(subscription_t *s)
+{
+	if (!s) return;
+	if (remove_reference(&s->ref)) free_subscription(s);
+}
 
-/** Destroys an existing subscription */
+void accept_subscription(subscription_t *s)
+{
+	if (!s) return;
+	add_reference(&s->ref);
+}
+
+/** Destroys an existing subscription - can be called ONLY by client !!! */
 void unsubscribe(notifier_domain_t *domain, subscription_t *s)
 {
 	notifier_package_t *p;
 	notifier_t *e;
 
+	/* mark subscription as un-notifyable */
+	lock_subscription_data(s);
+	s->dst = NULL;
+	unlock_subscription_data(s);
+
 	lock_notifier_domain(domain);
 	
 	/* maybe: test if the SUBSCRIBER is subscribed before unsubsc. */
@@ -349,6 +387,28 @@ void unsubscribe(notifier_domain_t *domain, subscription_t *s)
 	
 	unlock_notifier_domain(domain);
 	
-	free_subscription(s);
+	/* remove clients reference (dont give references to client?) */
+	remove_reference(&s->ref);
+	
+	release_subscription(s); 
 }
 
+void notify_subscriber(subscription_t *s, mq_message_t *msg)
+{
+	int sent = 0;
+	
+	if (s) {
+		lock_subscription_data(s);
+		if (s->dst) {
+			push_message(s->dst, msg);
+			sent = 1;
+		}
+		else free_message(msg);
+		unlock_subscription_data(s);
+	}
+	
+	if (!sent) {
+		/* free unsent messages */
+		free_message(msg);
+	}
+}

+ 5 - 0
lib/presence/notifier_domain.h

@@ -30,6 +30,7 @@
 #include <cds/ptr_vector.h>
 #include <cds/sync.h>
 #include <cds/msg_queue.h>
+#include <cds/ref_cntr.h>
 
 #include <presence/client_notify_info.h>
 
@@ -53,12 +54,14 @@ typedef struct _notifier_domain_t notifier_domain_t;
  */
 struct _subscription_t {
 	/* client_notify_func notify; */
+	cds_mutex_t *mutex;
 	msg_queue_t *dst;
 	str_t record_id;
 	str_t subscriber_id;
 	notifier_package_t *package;
 	void *subscriber_data;
 	struct _subscription_t *prev, *next;
+	reference_counter_data_t ref;
 };
 
 typedef int (*server_subscribe_func)(notifier_t *n, subscription_t *subscription);
@@ -85,8 +88,10 @@ struct _notifier_package_t {
 
 struct _notifier_domain_t {
 	cds_mutex_t mutex;
+	cds_mutex_t data_mutex; /* mutex for locking standalone subscription data, may be changed to mutex pool */
 	str_t name;
 	notifier_package_t *first_package, *last_package;
+	reference_counter_data_t ref;
 };
 
 /* -------- Domain initialization/destruction functions -------- */

+ 48 - 16
lib/presence/qsa.c

@@ -25,43 +25,74 @@
 
 #include <presence/qsa.h>
 #include <cds/logger.h>
+#include <cds/cds.h>
 #include <presence/domain_maintainer.h>
 
-static domain_maintainer_t *dm = NULL;
-static int initialized = 0;
+typedef struct {
+	int init_cnt;
+	domain_maintainer_t *dm;
+} init_data_t;
+
+static init_data_t *init = NULL;
 
 int qsa_initialize()
 {
-	if (!initialized) {
-		dm = create_domain_maintainer();
-		if (dm) initialized = 1;
-		else {
+	int res = 0;
+
+	cds_initialize();
+	
+	/* initialization should be called from one process/thread 
+	 * it is not synchronized because it is impossible ! */
+	if (!init) {
+		init = (init_data_t*)cds_malloc(sizeof(init_data_t));
+		if (!init) return -1;
+		init->init_cnt = 0;
+	}
+
+	if (init->init_cnt > 0) { /* already initialized */
+		init->init_cnt++;
+		return 0;
+	}
+	else {
+		DEBUG_LOG("qsa_initialize(): init the content\n");
+
+		/* !!! put the real initialization here !!! */
+		init->dm = create_domain_maintainer();
+		if (!init->dm) {
 			ERROR_LOG("qsa_initialize error - can't initialize domain maintainer\n");
-			return -1;
+			res = -1;
 		}
-		DEBUG_LOG("QSA initialized\n");
 	}
-	return 0;
+			
+	if (res == 0) init->init_cnt++;
+	return res;
 }
 
 void qsa_cleanup() 
 {
-	if (initialized && dm) {
-		destroy_domain_maintainer(dm);
-		dm = NULL;
-		initialized = 0;
+	if (init) {
+		if (--init->init_cnt == 0) {
+			DEBUG_LOG("qsa_cleanup(): cleaning the content\n");
+			
+			/* !!! put the real destruction here !!! */
+			if (init->dm) destroy_domain_maintainer(init->dm);
+			
+			cds_free(init);
+			init = NULL;
+		}
 	}
+	cds_cleanup();
 }
 
 notifier_domain_t *qsa_register_domain(const str_t *name)
 {
 	notifier_domain_t *d = NULL;
 
-	if (!dm) {
+	if (!init) {
 		ERROR_LOG("qsa_initialize was not called - can't register domain\n");
 		return NULL;
 	}
-	d = register_notifier_domain(dm, name);
+	if (init->dm) d = register_notifier_domain(init->dm, name);
 	return d;
 }
 
@@ -72,5 +103,6 @@ notifier_domain_t *qsa_get_default_domain()
 
 void qsa_release_domain(notifier_domain_t *domain)
 {
-	if (dm) release_notifier_domain(dm, domain);
+	if (init) 
+		if (init->dm) release_notifier_domain(init->dm, domain);
 }