[Freeswitch-svn] [commit] r13746 - freeswitch/trunk/src/mod/event_handlers/mod_erlang_event

FreeSWITCH SVN mrene at freeswitch.org
Tue Jun 9 23:39:09 PDT 2009


Author: mrene
Date: Wed Jun 10 01:39:08 2009
New Revision: 13746

Log:
FSCORE-379

Modified:
   freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c

Modified: freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c
==============================================================================
--- freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c	(original)
+++ freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c	Wed Jun 10 01:39:08 2009
@@ -998,6 +998,7 @@
 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error\n");
 		return NULL;
 	}
+	memset(listener, 0, sizeof(*listener));
 
 	switch_thread_rwlock_create(&listener->rwlock, listener_pool);
 	switch_queue_create(&listener->event_queue, SWITCH_CORE_QUEUE_LEN, listener_pool);
@@ -1039,126 +1040,145 @@
 	return listener;
 }
 
+static switch_status_t state_handler(switch_core_session_t *session)
+{
+	switch_channel_t *channel = switch_core_session_get_channel(session);
+	switch_channel_state_t state = switch_channel_get_state(channel);
+	
+	if (state >= CS_HANGUP)  {
+		session_elem_t *session_element = switch_channel_get_private(channel, "_erlang_session_");
+		listener_t* listener = switch_channel_get_private(channel, "_erlang_listener_");
+		
+		if (session_element && listener) {
+			remove_session_elem_from_listener(listener, session_element);
+		}
+
+		switch_core_event_hook_remove_state_change(session, state_handler);
+	}
+
+	return SWITCH_STATUS_SUCCESS;
+}
+
+session_elem_t *session_elem_create(listener_t* listener, switch_core_session_t *session) 
+{
+	/* create a session list element */
+	session_elem_t* session_element = switch_core_session_alloc(session, sizeof(*session_element));
+	switch_channel_t *channel = switch_core_session_get_channel(session);
+
+	memcpy(session_element->uuid_str, switch_core_session_get_uuid(session), SWITCH_UUID_FORMATTED_LENGTH);
+	
+	switch_queue_create(&session_element->event_queue, SWITCH_CORE_QUEUE_LEN, switch_core_session_get_pool(session));
+	switch_mutex_init(&session_element->flag_mutex, SWITCH_MUTEX_NESTED, switch_core_session_get_pool(session));
+	
+	switch_channel_set_private(channel, "_erlang_session_", session_element);
+	switch_channel_set_private(channel, "_erlang_listener_", listener);
+	
+	switch_core_event_hook_add_state_change(session, state_handler);
+	
+	return session_element;
+}
 
 session_elem_t* attach_call_to_registered_process(listener_t* listener, char* reg_name, switch_core_session_t *session)
 {
 	/* create a session list element */
-	session_elem_t* session_element = NULL;
-	if (!(session_element = switch_core_session_alloc(session, sizeof(*session_element)))) {
-		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to allocate session element\n");
-	} else {
-		memcpy(session_element->uuid_str, switch_core_session_get_uuid(session), SWITCH_UUID_FORMATTED_LENGTH);
-		session_element->process.type = ERLANG_REG_PROCESS;
-		session_element->process.reg_name = switch_core_strdup(switch_core_session_get_pool(session),reg_name);
-		switch_set_flag(session_element, LFLAG_SESSION_ALIVE);
-		switch_clear_flag(session_element, LFLAG_OUTBOUND_INIT);
-		switch_queue_create(&session_element->event_queue, SWITCH_CORE_QUEUE_LEN, switch_core_session_get_pool(session));
-		switch_mutex_init(&session_element->flag_mutex, SWITCH_MUTEX_NESTED, switch_core_session_get_pool(session));
-		/* attach the session to the listener */
-		add_session_elem_to_listener(listener,session_element);
-	}
+	session_elem_t* session_element = session_elem_create(listener, session);
+
+    session_element->process.type = ERLANG_REG_PROCESS;
+    session_element->process.reg_name = switch_core_session_strdup(session, reg_name);
+    switch_set_flag(session_element, LFLAG_SESSION_ALIVE);
+    /* attach the session to the listener */
+    add_session_elem_to_listener(listener,session_element);
+
 	return session_element;
 }
 
 session_elem_t* attach_call_to_pid(listener_t* listener, erlang_pid* pid, switch_core_session_t *session)
 {
 	/* create a session list element */
-	session_elem_t* session_element = NULL;
-	if (!(session_element = switch_core_session_alloc(session, sizeof(*session_element)))) {
-		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to allocate session element\n");
-	} else {
-		memcpy(session_element->uuid_str, switch_core_session_get_uuid(session), SWITCH_UUID_FORMATTED_LENGTH);
-		session_element->process.type = ERLANG_PID;
-		memcpy(&session_element->process.pid, pid, sizeof(erlang_pid));
-		switch_set_flag(session_element, LFLAG_SESSION_ALIVE);
-		switch_clear_flag(session_element, LFLAG_OUTBOUND_INIT);
-		switch_queue_create(&session_element->event_queue, SWITCH_CORE_QUEUE_LEN, switch_core_session_get_pool(session));
-		switch_mutex_init(&session_element->flag_mutex, SWITCH_MUTEX_NESTED, switch_core_session_get_pool(session));
-		/* attach the session to the listener */
-		add_session_elem_to_listener(listener,session_element);
-
-		ei_link(listener, ei_self(listener->ec), pid);
-	}
+	session_elem_t* session_element = session_elem_create(listener, session);
+	
+	session_element->process.type = ERLANG_PID;
+	memcpy(&session_element->process.pid, pid, sizeof(erlang_pid));
+	switch_set_flag(session_element, LFLAG_SESSION_ALIVE);
+	/* attach the session to the listener */
+	add_session_elem_to_listener(listener,session_element);
+	ei_link(listener, ei_self(listener->ec), pid);
+	
 	return session_element;
 }
 
 session_elem_t* attach_call_to_spawned_process(listener_t* listener, char *module, char *function, switch_core_session_t *session)
 {
 	/* create a session list element */
-	session_elem_t* session_element=NULL;
-	if (!(session_element = switch_core_session_alloc(session, sizeof(*session_element)))) {
-		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to allocate session element\n");
-	} else {
-		char hash[100];
-		int i = 0;
-		void *p = NULL;
-
-		memcpy(session_element->uuid_str, switch_core_session_get_uuid(session), SWITCH_UUID_FORMATTED_LENGTH);
-		erlang_pid *pid;
-		erlang_ref ref;
-
-		switch_set_flag(session_element, LFLAG_WAITING_FOR_PID);
-		switch_queue_create(&session_element->event_queue, SWITCH_CORE_QUEUE_LEN, switch_core_session_get_pool(session));
-		switch_mutex_init(&session_element->flag_mutex, SWITCH_MUTEX_NESTED, switch_core_session_get_pool(session));
-		/* attach the session to the listener */
-		add_session_elem_to_listener(listener,session_element);
-
-		ei_init_ref(listener->ec, &ref);
-		ei_hash_ref(&ref, hash);
-		/* insert the waiting marker */
-		switch_core_hash_insert(listener->spawn_pid_hash, hash, &globals.WAITING);
-
-		if (!strcmp(function, "!")) {
-			/* send a message to request a pid */
-			ei_x_buff rbuf;
-			ei_x_new_with_version(&rbuf);
-
-			ei_x_encode_tuple_header(&rbuf, 3);
-			ei_x_encode_atom(&rbuf, "new_pid");
-			ei_x_encode_ref(&rbuf, &ref);
-			ei_x_encode_pid(&rbuf, ei_self(listener->ec));
-			/* should lock with mutex? */
-			ei_reg_send(listener->ec, listener->sockfd, module, rbuf.buff, rbuf.index);
+	session_elem_t* session_element = session_elem_create(listener, session);
+	char hash[100];
+	int i = 0;
+	void *p = NULL;
+	erlang_pid *pid;
+	erlang_ref ref;
+
+	switch_set_flag(session_element, LFLAG_WAITING_FOR_PID);
+	
+	/* attach the session to the listener */
+	add_session_elem_to_listener(listener,session_element);
+
+	ei_init_ref(listener->ec, &ref);
+	ei_hash_ref(&ref, hash);
+	/* insert the waiting marker */
+	switch_core_hash_insert(listener->spawn_pid_hash, hash, &globals.WAITING);
+
+	if (!strcmp(function, "!")) {
+		/* send a message to request a pid */
+		ei_x_buff rbuf;
+		ei_x_new_with_version(&rbuf);
+
+		ei_x_encode_tuple_header(&rbuf, 3);
+		ei_x_encode_atom(&rbuf, "new_pid");
+		ei_x_encode_ref(&rbuf, &ref);
+		ei_x_encode_pid(&rbuf, ei_self(listener->ec));
+		/* should lock with mutex? */
+		ei_reg_send(listener->ec, listener->sockfd, module, rbuf.buff, rbuf.index);
 #ifdef EI_DEBUG
-			ei_x_print_reg_msg(&rbuf, module, 1);
+		ei_x_print_reg_msg(&rbuf, module, 1);
 #endif
-			ei_x_free(&rbuf);
-		} else {
-			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "rpc call: %s:%s(Ref)\n", module, function);
-			/* should lock with mutex? */
-			ei_pid_from_rpc(listener->ec, listener->sockfd, &ref, module, function);
-			/*
-			   char *argv[1];
-			   ei_spawn(listener->ec, listener->sockfd, &ref, module, function, 0, argv);
-			   */
-		}
-
-		/* loop until either we timeout or we get a value that's not the waiting marker */
-		while (!(p = switch_core_hash_find(listener->spawn_pid_hash, hash)) || p == &globals.WAITING) {
-			if (i > 50) { /* half a second timeout */
-				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Timed out when waiting for outbound pid\n");
-				remove_session_elem_from_listener(listener,session_element);
-				switch_core_hash_insert(listener->spawn_pid_hash, hash, &globals.TIMEOUT); /* TODO lock this? */
-				return NULL;
-			}
-			i++;
-			switch_yield(10000); /* 10ms */
+		ei_x_free(&rbuf);
+	} else {
+		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "rpc call: %s:%s(Ref)\n", module, function);
+		/* should lock with mutex? */
+		ei_pid_from_rpc(listener->ec, listener->sockfd, &ref, module, function);
+		/*
+		   char *argv[1];
+		   ei_spawn(listener->ec, listener->sockfd, &ref, module, function, 0, argv);
+		   */
+	}
+
+	/* loop until either we timeout or we get a value that's not the waiting marker */
+	while (!(p = switch_core_hash_find(listener->spawn_pid_hash, hash)) || p == &globals.WAITING) {
+		if (i > 50) { /* half a second timeout */
+			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Timed out when waiting for outbound pid\n");
+			remove_session_elem_from_listener(listener,session_element);
+			switch_core_hash_insert(listener->spawn_pid_hash, hash, &globals.TIMEOUT); /* TODO lock this? */
+			return NULL;
 		}
+		i++;
+		switch_yield(10000); /* 10ms */
+	}
 
-		switch_core_hash_delete(listener->spawn_pid_hash, hash);
+	switch_core_hash_delete(listener->spawn_pid_hash, hash);
 
-		pid = (erlang_pid *) p;
-		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "got pid!\n");
+	pid = (erlang_pid *) p;
+	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "got pid!\n");
 
-		session_element->process.type = ERLANG_PID;
-		memcpy(&session_element->process.pid, pid, sizeof(erlang_pid));
-		switch_set_flag(session_element, LFLAG_SESSION_ALIVE);
-		switch_clear_flag(session_element, LFLAG_OUTBOUND_INIT);
-		switch_clear_flag(session_element, LFLAG_WAITING_FOR_PID);
+	session_element->process.type = ERLANG_PID;
+	memcpy(&session_element->process.pid, pid, sizeof(erlang_pid));
+	
+	switch_set_flag(session_element, LFLAG_SESSION_ALIVE);
+	switch_clear_flag(session_element, LFLAG_OUTBOUND_INIT);
+	switch_clear_flag(session_element, LFLAG_WAITING_FOR_PID);
+
+	ei_link(listener, ei_self(listener->ec), pid);
+	switch_safe_free(pid); /* malloced in handle_ref_tuple */
 
-		ei_link(listener, ei_self(listener->ec), pid);
-		switch_safe_free(pid); /* malloced in handle_ref_tuple */
-	}
 	return session_element;
 }
 



More information about the Freeswitch-svn mailing list