[Freeswitch-svn] [commit] r7891 - freeswitch/trunk/src

Freeswitch SVN anthm at freeswitch.org
Fri Mar 14 12:21:13 EDT 2008


Author: anthm
Date: Fri Mar 14 12:21:13 2008
New Revision: 7891

Modified:
   freeswitch/trunk/src/switch_core_session.c
   freeswitch/trunk/src/switch_event.c

Log:
some more stuff

Modified: freeswitch/trunk/src/switch_core_session.c
==============================================================================
--- freeswitch/trunk/src/switch_core_session.c	(original)
+++ freeswitch/trunk/src/switch_core_session.c	Fri Mar 14 12:21:13 2008
@@ -739,6 +739,7 @@
 			switch_assert(event_str);
 			switch_core_memory_pool_tag(switch_core_session_get_pool(session), switch_core_session_strdup(session, event_str));
 			free(event_str);
+			switch_event_destroy(&event);
 		}
 	}
 

Modified: freeswitch/trunk/src/switch_event.c
==============================================================================
--- freeswitch/trunk/src/switch_event.c	(original)
+++ freeswitch/trunk/src/switch_event.c	Fri Mar 14 12:21:13 2008
@@ -35,7 +35,8 @@
 #include <switch.h>
 #include <switch_event.h>
 #define MAX_DISPATCH 20
-#define DISPATCH_QUEUE_LEN 1000
+#define DISPATCH_QUEUE_LEN 5000
+//#define DEBUG_DISPATCH_QUEUES
 
 static int SOFT_MAX_DISPATCH = 0;
 static char hostname[128] = "";
@@ -44,18 +45,15 @@
 static switch_event_node_t *EVENT_NODES[SWITCH_EVENT_ALL + 1] = { NULL };
 static switch_mutex_t *BLOCK = NULL;
 static switch_mutex_t *POOL_LOCK = NULL;
-static switch_mutex_t *EVENT_QUEUE_MUTEX = NULL;
-static switch_mutex_t *EVENT_QUEUE_HAVEMORE_MUTEX = NULL;
-static switch_thread_cond_t *EVENT_QUEUE_CONDITIONAL = NULL;
 static switch_memory_pool_t *RUNTIME_POOL = NULL;
 static switch_memory_pool_t *THRUNTIME_POOL = NULL;
 static switch_queue_t *EVENT_QUEUE[3] = { 0, 0, 0 };
 static switch_queue_t *EVENT_DISPATCH_QUEUE[MAX_DISPATCH] = { 0 };
 static int POOL_COUNT_MAX = SWITCH_CORE_QUEUE_LEN;
-
+static switch_mutex_t *EVENT_QUEUE_MUTEX = NULL;
 static switch_hash_t *CUSTOM_HASH = NULL;
-static int THREAD_RUNNING = 0;
-static int EVENT_QUEUE_HAVEMORE = 0;
+static int THREAD_COUNT = 0;
+static int SYSTEM_RUNNING = 0;
 static switch_queue_t *EVENT_RECYCLE_QUEUE = NULL;
 static switch_queue_t *EVENT_HEADER_RECYCLE_QUEUE = NULL;
 static void launch_dispatch_threads(int max, int len, switch_memory_pool_t *pool);
@@ -176,12 +174,21 @@
 static void *SWITCH_THREAD_FUNC switch_event_dispatch_thread(switch_thread_t * thread, void *obj)
 {
 	switch_queue_t *queue = (switch_queue_t *) obj;
-	int THREAD_RUNNING = 1;
-	
-	while (THREAD_RUNNING == 1) {
+	int my_id = 0;
+	switch_mutex_lock(EVENT_QUEUE_MUTEX);
+	THREAD_COUNT++;
+	switch_mutex_unlock(EVENT_QUEUE_MUTEX);
+
+	for (my_id = 0; my_id < MAX_DISPATCH; my_id++) {
+		if (EVENT_DISPATCH_QUEUE[my_id] == queue) {
+			break;
+		}
+	}
+
+	for(;;) {
 		void *pop = NULL;
 		switch_event_t *event = NULL;
-		
+
 		if (switch_queue_pop(queue, &pop) != SWITCH_STATUS_SUCCESS) {
 			break;
 		}
@@ -189,138 +196,80 @@
 		if (!pop) {
 			break;
 		}
-
+		
 		event = (switch_event_t *) pop;
 		switch_event_deliver(&event);
 	}
 
-	THREAD_RUNNING = 0;
-	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Dispatch Ended.\n");
+
+	switch_mutex_lock(EVENT_QUEUE_MUTEX);
+	THREAD_COUNT--;
+	switch_mutex_unlock(EVENT_QUEUE_MUTEX);
+	
+	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Dispatch Thread %d Ended.\n", my_id);
 	return NULL;
 	
 }
 
+
 static void *SWITCH_THREAD_FUNC switch_event_thread(switch_thread_t * thread, void *obj)
 {
-	switch_event_t *out_event = NULL;
-	switch_queue_t *queue = NULL;
-	switch_queue_t *queues[3] = { 0, 0, 0 };
-	void *pop;
-	int i, len[3] = { 0, 0, 0 };
+	switch_queue_t *queue = (switch_queue_t *) obj;
 	int index = 0;
-
-	switch_assert(thread != NULL);
-	switch_assert(obj == NULL);
-	switch_assert(POOL_LOCK != NULL);
-	switch_assert(RUNTIME_POOL != NULL);
-	switch_assert(EVENT_QUEUE_MUTEX != NULL);
-	switch_assert(EVENT_QUEUE_HAVEMORE_MUTEX != NULL);
-	switch_assert(EVENT_QUEUE_CONDITIONAL != NULL);
-	THREAD_RUNNING = 1;
-
-	queues[0] = EVENT_QUEUE[SWITCH_PRIORITY_HIGH];
-	queues[1] = EVENT_QUEUE[SWITCH_PRIORITY_NORMAL];
-	queues[2] = EVENT_QUEUE[SWITCH_PRIORITY_LOW];
+	int my_id = 0;
 
 	switch_mutex_lock(EVENT_QUEUE_MUTEX);
+	THREAD_COUNT++;
+	switch_mutex_unlock(EVENT_QUEUE_MUTEX);
+
+	for (my_id = 0; my_id < MAX_DISPATCH; my_id++) {
+		if (EVENT_QUEUE[my_id] == queue) {
+			break;
+		}
+	}
+	
+	for(;;) {
+		void *pop = NULL;
+		switch_event_t *event = NULL;
+		
+		if (switch_queue_pop(queue, &pop) != SWITCH_STATUS_SUCCESS) {
+			break;
+		}
 
-	for (;;) {
-		int any;
+		if (!pop) {
+			break;
+		}
 
-		len[1] = switch_queue_size(EVENT_QUEUE[SWITCH_PRIORITY_NORMAL]);
-		len[2] = switch_queue_size(EVENT_QUEUE[SWITCH_PRIORITY_LOW]);
-		len[0] = switch_queue_size(EVENT_QUEUE[SWITCH_PRIORITY_HIGH]);
-		any = len[1] + len[2] + len[0];
-
-		if (!any) {
-			/* lock on havemore so we are the only ones poking at it while we check it
-			 * see if we saw anything in the queues or have a check again flag
-			 */
-			switch_mutex_lock(EVENT_QUEUE_HAVEMORE_MUTEX);
-			if (!EVENT_QUEUE_HAVEMORE) {
-				/* See if we need to quit */
-				if (THREAD_RUNNING != 1) {
-					/* give up our lock */
-					switch_mutex_unlock(EVENT_QUEUE_HAVEMORE_MUTEX);
+		event = (switch_event_t *) pop;
 
-					/* game over */
+		while(event) {
+			for (index = 0; index < SOFT_MAX_DISPATCH; index++) {
+				if (switch_queue_trypush(EVENT_DISPATCH_QUEUE[index], event) == SWITCH_STATUS_SUCCESS) {
+					event = NULL;
 					break;
 				}
-
-				/* give up our lock */
-				switch_mutex_unlock(EVENT_QUEUE_HAVEMORE_MUTEX);
-
-				/* wait until someone tells us we have something to do */
-				switch_thread_cond_wait(EVENT_QUEUE_CONDITIONAL, EVENT_QUEUE_MUTEX);
-			} else {
-				/* Caught a race, one of the queues was updated after we looked at it 
-				 * reset our flag
-				 */
-				EVENT_QUEUE_HAVEMORE = 0;
-
-				/* give up our lock */
-				switch_mutex_unlock(EVENT_QUEUE_HAVEMORE_MUTEX);
 			}
-
-			/* go grab some events */
-			continue;
-		}
-
-		for (i = 0; i < 3; i++) {
-			if (len[i]) {
-
-				queue = queues[i];
-
-				while (queue) {
-					if (switch_queue_trypop(queue, &pop) == SWITCH_STATUS_SUCCESS) {
-						out_event = pop;
-
-						if (!pop) {
-							continue;
-						}
-
-					retry:
-						
-						for (index = 0; index < SOFT_MAX_DISPATCH; index++) {
-							if (switch_queue_trypush(EVENT_DISPATCH_QUEUE[index], out_event) == SWITCH_STATUS_SUCCESS) {
-								out_event = NULL;
-								break;
-							}
-						}
-
-						if (out_event) {
-							switch_yield(1000);
-
-							if (SOFT_MAX_DISPATCH+1 < MAX_DISPATCH) {
-								launch_dispatch_threads(SOFT_MAX_DISPATCH+1, DISPATCH_QUEUE_LEN, RUNTIME_POOL);
-							}
-
-							goto retry;
-						}
-
-#ifdef DEBUG_DISPATCH_QUEUES
-						printf ("SIZE ");
-						for (index = 0; index < SOFT_MAX_DISPATCH; index++) {
-							printf ("%d ", switch_queue_size(EVENT_DISPATCH_QUEUE[index]));
-						}
-						printf("\n");
-#endif
-					} else {
-						break;
-					}
+		
+			if (event) {
+				if (SOFT_MAX_DISPATCH+1 < MAX_DISPATCH) {				
+					switch_mutex_lock(EVENT_QUEUE_MUTEX);			
+					launch_dispatch_threads(SOFT_MAX_DISPATCH+1, DISPATCH_QUEUE_LEN, RUNTIME_POOL);
+					switch_mutex_unlock(EVENT_QUEUE_MUTEX);
 				}
 			}
 		}
-
-		if (THREAD_RUNNING < 0) {
-			THREAD_RUNNING--;
-		}
 	}
 
-	THREAD_RUNNING = 0;
+	switch_mutex_lock(EVENT_QUEUE_MUTEX);
+	THREAD_COUNT--;
+	switch_mutex_unlock(EVENT_QUEUE_MUTEX);
+	
+	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Event Thread %d Ended.\n", my_id);
 	return NULL;
+	
 }
 
+
 SWITCH_DECLARE(void) switch_event_deliver(switch_event_t **event)
 {
 	switch_event_types_t e;
@@ -344,7 +293,7 @@
 
 SWITCH_DECLARE(switch_status_t) switch_event_running(void)
 {
-	return THREAD_RUNNING ? SWITCH_STATUS_SUCCESS : SWITCH_STATUS_FALSE;
+	return SYSTEM_RUNNING ? SWITCH_STATUS_SUCCESS : SWITCH_STATUS_FALSE;
 }
 
 SWITCH_DECLARE(char *) switch_event_name(switch_event_types_t event)
@@ -417,46 +366,28 @@
 {
 	int x = 0, last = 0;
 
-	if (THREAD_RUNNING > 0) {
-		THREAD_RUNNING = -1;
-
-		/* lock on havemore to make sure the event thread, if currently running
-		 * doesn't check the HAVEMORE flag before we set it
-		 */
-		switch_mutex_lock(EVENT_QUEUE_HAVEMORE_MUTEX);
-		/* see if the event thread is sitting */
-		if (switch_mutex_trylock(EVENT_QUEUE_MUTEX) == SWITCH_STATUS_SUCCESS) {
-			/* we don't need havemore anymore, the thread was sitting already */
-			switch_mutex_unlock(EVENT_QUEUE_HAVEMORE_MUTEX);
-
-			/* wake up the event thread */
-			switch_thread_cond_signal(EVENT_QUEUE_CONDITIONAL);
-
-			/* give up our lock */
-			switch_mutex_unlock(EVENT_QUEUE_MUTEX);
-		} else {
-			/* it wasn't waiting which means we might have updated a queue it already looked at
-			 * set a flag so it knows to read the queues again
-			 */
-			EVENT_QUEUE_HAVEMORE = 1;
-
-			/* variable updated, give up the mutex */
-			switch_mutex_unlock(EVENT_QUEUE_HAVEMORE_MUTEX);
-		}
+	switch_mutex_lock(EVENT_QUEUE_MUTEX);
+	SYSTEM_RUNNING = 0;
+	switch_mutex_unlock(EVENT_QUEUE_MUTEX);
 
-		for(x = 0; x < SOFT_MAX_DISPATCH; x++) {
-			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch queue %d\n", x);
-			switch_queue_trypush(EVENT_DISPATCH_QUEUE[x], NULL);
-		}
+	for(x = 0; x < 3; x++) {
+		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping event queue %d\n", x);
+		switch_queue_push(EVENT_QUEUE[x], NULL);
+	}
 
-		while (x < 10000 && THREAD_RUNNING) {
-			switch_yield(1000);
-			if (THREAD_RUNNING == last) {
-				x++;
-			}
-			last = THREAD_RUNNING;
+	for(x = 0; x < SOFT_MAX_DISPATCH; x++) {
+		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch queue %d\n", x);
+		switch_queue_push(EVENT_DISPATCH_QUEUE[x], NULL);
+	}
+		
+	while (x < 10000 && THREAD_COUNT) {
+		switch_yield(1000);
+		if (THREAD_COUNT == last) {
+			x++;
 		}
+		last = THREAD_COUNT;
 	}
+	
 
 	switch_core_hash_destroy(&CUSTOM_HASH);
 	switch_core_memory_reclaim_events();
@@ -517,18 +448,24 @@
 	switch_mutex_init(&BLOCK, SWITCH_MUTEX_NESTED, RUNTIME_POOL);
 	switch_mutex_init(&POOL_LOCK, SWITCH_MUTEX_NESTED, RUNTIME_POOL);
 	switch_mutex_init(&EVENT_QUEUE_MUTEX, SWITCH_MUTEX_NESTED, RUNTIME_POOL);
-	switch_mutex_init(&EVENT_QUEUE_HAVEMORE_MUTEX, SWITCH_MUTEX_NESTED, RUNTIME_POOL);
-	switch_thread_cond_create(&EVENT_QUEUE_CONDITIONAL, RUNTIME_POOL);
 	switch_core_hash_init(&CUSTOM_HASH, RUNTIME_POOL);
 	switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
 	switch_threadattr_priority_increase(thd_attr);
 
 	launch_dispatch_threads(1, DISPATCH_QUEUE_LEN, RUNTIME_POOL);
-	switch_thread_create(&thread, thd_attr, switch_event_thread, NULL, RUNTIME_POOL);
-
-	while (!THREAD_RUNNING) {
+	switch_thread_create(&thread, thd_attr, switch_event_thread, EVENT_QUEUE[0], RUNTIME_POOL);
+	switch_thread_create(&thread, thd_attr, switch_event_thread, EVENT_QUEUE[1], RUNTIME_POOL);
+	switch_thread_create(&thread, thd_attr, switch_event_thread, EVENT_QUEUE[2], RUNTIME_POOL);
+	
+	while (!THREAD_COUNT) {
 		switch_yield(1000);
 	}
+
+
+	switch_mutex_lock(EVENT_QUEUE_MUTEX);
+	SYSTEM_RUNNING = 1;
+	switch_mutex_unlock(EVENT_QUEUE_MUTEX);
+
 	return SWITCH_STATUS_SUCCESS;
 }
 
@@ -969,12 +906,10 @@
 
 	switch_assert(BLOCK != NULL);
 	switch_assert(RUNTIME_POOL != NULL);
-	switch_assert(EVENT_QUEUE_HAVEMORE_MUTEX != NULL);
 	switch_assert(EVENT_QUEUE_MUTEX != NULL);
-	switch_assert(EVENT_QUEUE_CONDITIONAL != NULL);
 	switch_assert(RUNTIME_POOL != NULL);
 
-	if (THREAD_RUNNING <= 0) {
+	if (SYSTEM_RUNNING <= 0) {
 		/* sorry we're closed */
 		switch_event_destroy(event);
 		return SWITCH_STATUS_FALSE;
@@ -1005,32 +940,8 @@
 		(*event)->event_user_data = user_data;
 	}
 	
-	switch_queue_push(EVENT_QUEUE[(*event)->priority], *event);
-
-	/* lock on havemore to make sure he event thread, if currently running
-	 * doesn't check the HAVEMORE flag before we set it
-	 */
-	switch_mutex_lock(EVENT_QUEUE_HAVEMORE_MUTEX);
-	/* see if the event thread is sitting */
-	if (switch_mutex_trylock(EVENT_QUEUE_MUTEX) == SWITCH_STATUS_SUCCESS) {
-		/* we don't need havemore anymore, the thread was sitting already */
-		switch_mutex_unlock(EVENT_QUEUE_HAVEMORE_MUTEX);
-
-		/* wake up the event thread */
-		switch_thread_cond_signal(EVENT_QUEUE_CONDITIONAL);
-
-		/* give up our lock */
-		switch_mutex_unlock(EVENT_QUEUE_MUTEX);
-	} else {
-		/* it wasn't waiting which means we might have updated a queue it already looked at
-		 * set a flag so it knows to read the queues again
-		 */
-		EVENT_QUEUE_HAVEMORE = 1;
-
-		/* variable updated, give up the mutex */
-		switch_mutex_unlock(EVENT_QUEUE_HAVEMORE_MUTEX);
-	}
 
+	switch_queue_push(EVENT_QUEUE[(*event)->priority], *event);
 	*event = NULL;
 
 	return SWITCH_STATUS_SUCCESS;



More information about the Freeswitch-svn mailing list