[Freeswitch-svn] [commit] r10403 - freeswitch/trunk/src/mod/event_handlers/mod_erlang_event

FreeSWITCH SVN andrew at freeswitch.org
Fri Nov 14 09:55:21 PST 2008


Author: andrew
Date: Fri Nov 14 12:55:20 2008
New Revision: 10403

Log:
Ton of stuff, mainly laying groundwork for bind_search functionality


Modified:
   freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c

Modified: freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c
==============================================================================
--- freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c	(original)
+++ freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c	Fri Nov 14 12:55:20 2008
@@ -53,14 +53,6 @@
 	LFLAG_STATEFUL = (1 << 8)
 } event_flag_t;
 
-
-/* TODO - support multiple event handlers per erlang connection each with their own event filters? */
-struct event_handler {
-	erlang_pid pid;
-	switch_hash_t *event_hash;
-	struct event_handler *next;
-};
-
 struct listener {
 	int sockfd;
 	struct ei_cnode_s *ec;
@@ -70,6 +62,7 @@
 	switch_queue_t *log_queue;
 	switch_memory_pool_t *pool;
 	switch_mutex_t *flag_mutex;
+	switch_mutex_t *sock_mutex;
 	char *ebuf;
 	uint32_t flags;
 	switch_log_level_t level;
@@ -98,6 +91,19 @@
 
 #define MAX_ACL 100
 
+struct erlang_binding {
+	switch_xml_section_t section;
+	erlang_pid pid;
+	char *registered_process; /* TODO */
+	listener_t *listener;
+	struct erlang_binding *next;
+};
+
+static struct {
+	struct erlang_binding *head;
+	switch_xml_binding_t *search_binding;
+} bindings;
+
 static struct {
 	switch_mutex_t *mutex;
 	char *ip;
@@ -163,6 +169,49 @@
 }
 
 
+/* Stolen from code added to ei in R12B-5.
+ * Since not everyone has this verison yet;
+ * provide our own version. 
+ * */
+
+#define put8(s,n) do { \
+	  (s)[0] = (char)((n) & 0xff); \
+	  (s) += 1; \
+} while (0)
+
+#define put32be(s,n) do {  \
+	  (s)[0] = ((n) >>  24) & 0xff; \
+	  (s)[1] = ((n) >>  16) & 0xff; \
+	  (s)[2] = ((n) >>  8) & 0xff;  \
+	  (s)[3] = (n) & 0xff; \
+	  (s) += 4; \
+} while (0)
+
+static void ei_link(listener_t *listener, erlang_pid *from, erlang_pid *to) {
+	char msgbuf[2048];
+	char *s;
+	int index = 0;
+	/*int n;*/
+
+	index = 5;                                     /* max sizes: */
+	ei_encode_version(msgbuf,&index);                     /*   1 */
+	ei_encode_tuple_header(msgbuf,&index,3);
+	ei_encode_long(msgbuf,&index,ERL_LINK);
+	ei_encode_pid(msgbuf,&index,from);                    /* 268 */
+	ei_encode_pid(msgbuf,&index,to);                      /* 268 */
+
+	/* 5 byte header missing */
+	s = msgbuf;
+	put32be(s, index - 4);                                /*   4 */
+	put8(s, ERL_PASS_THROUGH);                            /*   1 */
+	/* sum:  542 */
+
+	switch_mutex_lock(listener->sock_mutex);
+	write(listener->sockfd, msgbuf, index);
+	switch_mutex_unlock(listener->sock_mutex);
+}
+
+
 static void expire_listener(listener_t **listener)
 {
 	void *pop;
@@ -180,6 +229,38 @@
 }
 
 
+static void remove_binding(listener_t *listener) {
+	struct erlang_binding *ptr, *lst = NULL;
+
+	switch_mutex_lock(globals.listener_mutex);
+
+	switch_xml_set_binding_sections(bindings.search_binding, (1 << sizeof(switch_xml_section_enum_t)));
+
+	for (ptr = bindings.head; ptr; lst = ptr, ptr = ptr->next) {
+		if (ptr->listener == listener) {
+			if (bindings.head == ptr) {
+				if (ptr->next) {
+					bindings.head = ptr->next;
+				} else {
+					bindings.head = NULL;
+					break;
+				}
+			} else {
+				if (ptr->next) {
+					lst->next = ptr->next;
+				} else {
+					lst->next = NULL;
+				}
+			}
+		} else {
+			switch_xml_set_binding_sections(bindings.search_binding, switch_xml_get_binding_sections(bindings.search_binding) | ptr->section);
+		}
+	}
+
+	switch_mutex_unlock(globals.listener_mutex);
+}
+
+
 static void ei_encode_switch_event(ei_x_buff *ebuf, switch_event_t *event)
 {
 	int i;
@@ -295,40 +376,6 @@
 }
 
 
-SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_erlang_event_shutdown)
-{
-	listener_t *l;
-	int sanity = 0;
-
-	prefs.done = 1;
-
-	switch_log_unbind_logger(socket_logger);
-
-	/*close_socket(&listen_list.sockfd);*/
-	
-	while (prefs.threads || prefs.done == 1) {
-		switch_yield(10000);
-		if (++sanity == 1000) {
-			break;
-		}
-	}
-
-	switch_event_unbind(&globals.node);
-
-	switch_mutex_lock(globals.listener_mutex);
-
-	for (l = listen_list.listeners; l; l = l->next) {
-		close_socket(&l->sockfd);
-	}
-
-	switch_mutex_unlock(globals.listener_mutex);
-
-	switch_sleep(1500000); /* sleep for 1.5 seconds */
-
-	return SWITCH_STATUS_SUCCESS;
-}
-
-
 static void add_listener(listener_t *listener)
 {
 	/* add me to the listeners so I get events */
@@ -357,25 +404,6 @@
 	switch_mutex_unlock(globals.listener_mutex);
 }
 
-SWITCH_MODULE_LOAD_FUNCTION(mod_erlang_event_load)
-{
-	switch_mutex_init(&globals.listener_mutex, SWITCH_MUTEX_NESTED, pool);
-
-	if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, NULL, &globals.node) != SWITCH_STATUS_SUCCESS) {
-		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't bind!\n");
-		close_socket(&listen_list.sockfd);
-		return SWITCH_STATUS_GENERR;
-	}
-
-	switch_log_bind_logger(socket_logger, SWITCH_LOG_DEBUG, SWITCH_FALSE);
-
-	/* connect my internal structure to the blank pointer passed to me */
-	*module_interface = switch_loadable_module_create_module_interface(pool, modname);
-
-	/* indicate that the module should continue to be loaded */
-	return SWITCH_STATUS_SUCCESS;
-}
-
 
 struct api_command_struct {
 	char *api_cmd;
@@ -456,7 +484,9 @@
 			ei_x_encode_string(&ebuf, acs->uuid_str);
 			ei_x_encode_string(&ebuf, reply);
 
+			switch_mutex_lock(acs->listener->sock_mutex);
 			ei_send(acs->listener->sockfd, &acs->pid, ebuf.buff, ebuf.index);
+			switch_mutex_unlock(acs->listener->sock_mutex);
 
 			ei_x_free(&ebuf);
 		}
@@ -478,7 +508,11 @@
 		
 		ei_x_encode_string(&rbuf, reply);
 
+
+		switch_mutex_lock(acs->listener->sock_mutex);
 		ei_send(acs->listener->sockfd, &acs->pid, rbuf.buff, rbuf.index);
+		switch_mutex_unlock(acs->listener->sock_mutex);
+
 		ei_x_free(&rbuf);
 	}
 
@@ -500,6 +534,38 @@
 
 }
 
+static switch_xml_t erlang_fetch (const char *sectionstr, const char *tag_name, const char *key_name, const char *key_value,
+								switch_event_t *params, void *user_data)
+{
+	switch_xml_t xml = NULL;
+	struct erlang_binding *ptr;
+
+	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "looking for bindings\n");
+
+	switch_xml_section_t section = switch_xml_parse_section_string((char *) sectionstr);
+
+	for (ptr = bindings.head; ptr && ptr->section != section; ptr = ptr->next); /* just get the first match */
+
+	if (!ptr) {
+		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "no binding for %s\n", sectionstr);
+		return NULL;
+	}
+
+	if (!ptr->listener) {
+		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "NULL pointer binding!\n");
+		return NULL; /* our pointer is trash */
+	}
+
+	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "binding for %s in section %s with key %s and value %s requested from node %s\n", tag_name, sectionstr, key_name, key_value, ptr->pid.node);
+
+
+	switch_mutex_lock(ptr->listener->sock_mutex);
+	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "It's a lock!\n");
+	switch_mutex_unlock(ptr->listener->sock_mutex);
+
+	return xml;
+}
+
 
 static int handle_msg(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff *rbuf)
 {
@@ -829,6 +895,57 @@
 				break;
 			}
 
+		} else if (!strncmp(tupletag, "bind", MAXATOMLEN)) {
+
+			/* format is (result|config|directory|dialplan|phrases)  */
+			char sectionstr[MAXATOMLEN];
+
+			if (ei_decode_atom(buf->buff, &buf->index, sectionstr)) {
+				ei_x_encode_tuple_header(rbuf, 2);
+				ei_x_encode_atom(rbuf, "error");
+				ei_x_encode_atom(rbuf, "badarg");
+				break;
+			}
+
+			switch_xml_section_t section;
+
+			if (!(section = switch_xml_parse_section_string(sectionstr))) {
+				ei_x_encode_tuple_header(rbuf, 2);
+				ei_x_encode_atom(rbuf, "error");
+				ei_x_encode_atom(rbuf, "badarg");
+				break;
+			}
+
+			struct erlang_binding *binding, *ptr;
+
+			if (!(binding = switch_core_alloc(listener->pool, sizeof(*binding)))) {
+				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error\n");
+				ei_x_encode_tuple_header(rbuf, 2);
+				ei_x_encode_atom(rbuf, "error");
+				ei_x_encode_atom(rbuf, "badmem");
+				break;
+			}
+
+			binding->section = section;
+			binding->pid = msg->from;
+			binding->listener = listener;
+
+			switch_mutex_lock(globals.listener_mutex);
+
+			for (ptr = bindings.head; ptr && ptr->next; ptr = ptr->next);
+
+			if (ptr) {
+				ptr->next = binding;
+			} else {
+				bindings.head = binding;
+			}
+			
+			switch_xml_set_binding_sections(bindings.search_binding, switch_xml_get_binding_sections(bindings.search_binding) | section);
+			switch_mutex_unlock(globals.listener_mutex);
+
+			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "sections %d\n", switch_xml_get_binding_sections(bindings.search_binding));
+
+			ei_link(listener, ei_self(listener->ec), &msg->from);
 
 		} else {
 			ei_x_encode_tuple_header(rbuf, 2);
@@ -852,10 +969,12 @@
 				switch_clear_flag_locked(listener, LFLAG_LOG);
 			}
 		} else if (!strncmp(atom, "register_log_handler", MAXATOMLEN)) {
+			ei_link(listener, ei_self(listener->ec), &msg->from);
 			listener->log_pid = msg->from;
 			listener->level = SWITCH_LOG_DEBUG;
 			switch_set_flag(listener, LFLAG_LOG);
 		} else if (!strncmp(atom, "register_event_handler", MAXATOMLEN)) {
+			ei_link(listener, ei_self(listener->ec), &msg->from);
 			listener->event_pid = msg->from;
 			if (!switch_test_flag(listener, LFLAG_EVENTS)) {
 				switch_set_flag_locked(listener, LFLAG_EVENTS);
@@ -888,6 +1007,10 @@
 			ei_x_encode_tuple_header(rbuf, 2);
 			ei_x_encode_atom(rbuf, "ok");
 			ei_x_encode_pid(rbuf, ei_self(listener->ec));
+		} else if (!strncmp(atom, "link", MAXATOMLEN)) {
+			/* debugging */
+			ei_link(listener, ei_self(listener->ec), &msg->from);
+			goto noreply;
 		} else {
 			ei_x_encode_tuple_header(rbuf, 2);
 			ei_x_encode_atom(rbuf, "error");
@@ -905,12 +1028,17 @@
 		break;
 	}
 
+	
+	switch_mutex_lock(listener->sock_mutex);
 	ei_send(listener->sockfd, &msg->from, rbuf->buff, rbuf->index);
+	switch_mutex_unlock(listener->sock_mutex);
 noreply:
 	return 0;
 
 event_done:
+	switch_mutex_lock(listener->sock_mutex);
 	ei_send(listener->sockfd, &msg->from, rbuf->buff, rbuf->index);
+	switch_mutex_unlock(listener->sock_mutex);
 	return 1;
 }
 
@@ -985,7 +1113,9 @@
 		ei_x_buff rbuf;
 		ei_x_new_with_version(&rbuf);
 
+		switch_mutex_lock(listener->sock_mutex);
 		status = ei_xreceive_msg_tmo(listener->sockfd, &msg, &buf, 100);
+		switch_mutex_unlock(listener->sock_mutex);
 
 		switch(status) {
 			case ERL_TICK :
@@ -1043,7 +1173,11 @@
 					ei_x_encode_tuple_header(&lbuf, 2);
 					ei_x_encode_atom(&lbuf, "log");
 					ei_x_encode_string(&lbuf, data);
+
+					switch_mutex_lock(listener->sock_mutex);
 					ei_send(listener->sockfd, &listener->log_pid, lbuf.buff, lbuf.index);
+					switch_mutex_unlock(listener->sock_mutex);
+
 					ei_x_free(&lbuf);
 				}
 			}
@@ -1060,7 +1194,9 @@
 
 				ei_encode_switch_event(&ebuf, pevent);
 
+				switch_mutex_lock(listener->sock_mutex);
 				ei_send(listener->sockfd, &listener->event_pid, ebuf.buff, ebuf.index);
+				switch_mutex_unlock(listener->sock_mutex);
 
 				ei_x_free(&ebuf);
 				switch_event_destroy(&pevent);
@@ -1084,6 +1220,9 @@
 	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connection Closed\n");
 	switch_core_hash_destroy(&listener->event_hash);
 
+	/* remove any bindings for this connection */
+	remove_binding(listener);
+
 	if (listener->session) {
 		switch_channel_clear_flag(switch_core_session_get_channel(listener->session), CF_CONTROLLED);
 		switch_clear_flag_locked(listener, LFLAG_SESSION);
@@ -1093,6 +1232,7 @@
 		switch_core_destroy_memory_pool(&pool);
 	}
 
+
 	switch_mutex_lock(globals.listener_mutex);
 	prefs.threads--;
 	switch_mutex_unlock(globals.listener_mutex);
@@ -1111,6 +1251,7 @@
 	switch_threadattr_detach_set(thd_attr, 1);
 	switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
 	switch_thread_create(&thread, thd_attr, listener_run, listener, listener->pool);
+
 }
 
 
@@ -1173,6 +1314,39 @@
 }
 
 
+/* Module Hooks */
+
+
+SWITCH_MODULE_LOAD_FUNCTION(mod_erlang_event_load)
+{
+	switch_mutex_init(&globals.listener_mutex, SWITCH_MUTEX_NESTED, pool);
+
+	if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, NULL, &globals.node) != SWITCH_STATUS_SUCCESS) {
+		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't bind!\n");
+		close_socket(&listen_list.sockfd);
+		return SWITCH_STATUS_GENERR;
+	}
+
+	switch_log_bind_logger(socket_logger, SWITCH_LOG_DEBUG, SWITCH_FALSE);
+
+	memset(&bindings, 0, sizeof(bindings));
+
+	if (switch_xml_bind_search_function_ret(erlang_fetch, (1 << sizeof(switch_xml_section_enum_t)), NULL, &bindings.search_binding) != SWITCH_STATUS_SUCCESS) {
+		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't bind!\n");
+		close_socket(&listen_list.sockfd);
+		return SWITCH_STATUS_GENERR;
+	}
+
+	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "sections %d\n", switch_xml_get_binding_sections(bindings.search_binding));
+
+	/* connect my internal structure to the blank pointer passed to me */
+	*module_interface = switch_loadable_module_create_module_interface(pool, modname);
+
+	/* indicate that the module should continue to be loaded */
+	return SWITCH_STATUS_SUCCESS;
+}
+
+
 SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime)
 {
 	switch_memory_pool_t *pool = NULL, *listener_pool = NULL;
@@ -1328,6 +1502,7 @@
 		listener_pool = NULL;
 		listener->level = SWITCH_LOG_DEBUG;
 		switch_mutex_init(&listener->flag_mutex, SWITCH_MUTEX_NESTED, listener->pool);
+		switch_mutex_init(&listener->sock_mutex, SWITCH_MUTEX_NESTED, listener->pool);
 		switch_core_hash_init(&listener->event_hash, listener->pool);
 
 		launch_listener_thread(listener);
@@ -1358,6 +1533,41 @@
 	return SWITCH_STATUS_TERM;
 }
 
+SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_erlang_event_shutdown)
+{
+	listener_t *l;
+	int sanity = 0;
+
+	prefs.done = 1;
+
+	switch_log_unbind_logger(socket_logger);
+
+	/*close_socket(&listen_list.sockfd);*/
+	
+	while (prefs.threads || prefs.done == 1) {
+		switch_yield(10000);
+		if (++sanity == 1000) {
+			break;
+		}
+	}
+
+	switch_event_unbind(&globals.node);
+	switch_xml_unbind_search_function_ptr(erlang_fetch);
+
+	switch_mutex_lock(globals.listener_mutex);
+
+	for (l = listen_list.listeners; l; l = l->next) {
+		close_socket(&l->sockfd);
+	}
+
+	switch_mutex_unlock(globals.listener_mutex);
+
+	switch_sleep(1500000); /* sleep for 1.5 seconds */
+
+	return SWITCH_STATUS_SUCCESS;
+}
+
+
 /* For Emacs:
  * Local Variables:
  * mode:c



More information about the Freeswitch-svn mailing list