[Freeswitch-svn] [commit] r11376 - freeswitch/trunk/src/mod/event_handlers/mod_erlang_event

FreeSWITCH SVN andrew at freeswitch.org
Thu Jan 22 11:34:53 PST 2009


Author: andrew
Date: Thu Jan 22 13:34:53 2009
New Revision: 11376

Log:
Merge in Rob Charlton's patch for outbound session support in mod_erlang_event


Added:
   freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/ei_helpers.c
   freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/handle_msg.c
   freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h
Modified:
   freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/Makefile
   freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c

Modified: freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/Makefile
==============================================================================
--- freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/Makefile	(original)
+++ freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/Makefile	Thu Jan 22 13:34:53 2009
@@ -1,5 +1,8 @@
 BASE=../../../..
+LOCAL_SOURCES=handle_msg.c ei_helpers.c
+LOCAL_OBJS=handle_msg.o ei_helpers.o
 include $(BASE)/build/modmake.rules
 
-LOCAL_CFLAGS=-I/usr/local/lib/erlang/lib/erl_interface-3.5.8/include -L/usr/local/lib/erlang/lib/erl_interface-3.5.8/lib/ -D_REENTRANT
+
+LOCAL_CFLAGS=-I/usr/local/lib/erlang/lib/erl_interface-3.5.9/include -L/usr/local/lib/erlang/lib/erl_interface-3.5.9/lib/ -D_REENTRANT
 LOCAL_LDFLAGS=-lei

Added: freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/ei_helpers.c
==============================================================================
--- (empty file)
+++ freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/ei_helpers.c	Thu Jan 22 13:34:53 2009
@@ -0,0 +1,183 @@
+/* 
+ * FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
+ * Copyright (C) 2005/2006, Anthony Minessale II <anthmct at yahoo.com>
+ *
+ * Version: MPL 1.1
+ *
+ * The contents of this file are subject to the Mozilla Public License Version
+ * 1.1 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS" basis,
+ * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+ * for the specific language governing rights and limitations under the
+ * License.
+ *
+ * The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
+ *
+ * The Initial Developer of the Original Code is
+ * Anthony Minessale II <anthmct at yahoo.com>
+ * Portions created by the Initial Developer are Copyright (C)
+ * the Initial Developer. All Rights Reserved.
+ *
+ * Contributor(s):
+ * 
+ * Anthony Minessale II <anthmct at yahoo.com>
+ * Andrew Thompson <andrew at hijacked.us>
+ * Rob Charlton <rob.charlton at savageminds.com>
+ *
+ *
+ * ei_helpers.c -- helper functions for ei
+ *
+ */
+#include <switch.h>
+#include <ei.h>
+#include "mod_erlang_event.h"
+
+/* Stolen from code added to ei in R12B-5.
+ * Since not everyone has this version yet;
+ * provide our own version. 
+ * */
+
+#define put8(s,n) do { \
+	  (s)[0] = (char)((n) & 0xff); \
+	  (s) += 1; \
+} while (0)
+
+#define put32be(s,n) do {  \
+	  (s)[0] = ((n) >>  24) & 0xff; \
+	  (s)[1] = ((n) >>  16) & 0xff; \
+	  (s)[2] = ((n) >>  8) & 0xff;  \
+	  (s)[3] = (n) & 0xff; \
+	  (s) += 4; \
+} while (0)
+
+void ei_link(listener_t *listener, erlang_pid *from, erlang_pid *to) {
+	char msgbuf[2048];
+	char *s;
+	int index = 0;
+	/*int n;*/
+
+	index = 5;                                     /* max sizes: */
+	ei_encode_version(msgbuf,&index);                     /*   1 */
+	ei_encode_tuple_header(msgbuf,&index,3);
+	ei_encode_long(msgbuf,&index,ERL_LINK);
+	ei_encode_pid(msgbuf,&index,from);                    /* 268 */
+	ei_encode_pid(msgbuf,&index,to);                      /* 268 */
+
+	/* 5 byte header missing */
+	s = msgbuf;
+	put32be(s, index - 4);                                /*   4 */
+	put8(s, ERL_PASS_THROUGH);                            /*   1 */
+	/* sum:  542 */
+
+	switch_mutex_lock(listener->sock_mutex);
+	write(listener->sockfd, msgbuf, index);
+	switch_mutex_unlock(listener->sock_mutex);
+}
+
+void ei_encode_switch_event_headers(ei_x_buff *ebuf, switch_event_t *event)
+{
+	int i;
+	char *uuid = switch_event_get_header(event, "unique-id");
+
+	switch_event_header_t *hp;
+
+	for (i = 0, hp = event->headers; hp; hp = hp->next, i++);
+
+	if (event->body)
+		i++;
+
+	ei_x_encode_list_header(ebuf, i+1);
+
+	if (uuid) {
+		ei_x_encode_string(ebuf, switch_event_get_header(event, "unique-id"));
+	} else {
+		ei_x_encode_atom(ebuf, "undefined");
+	}
+
+	for (hp = event->headers; hp; hp = hp->next) {
+		ei_x_encode_tuple_header(ebuf, 2);
+		ei_x_encode_string(ebuf, hp->name);
+		ei_x_encode_string(ebuf, hp->value);
+	}
+
+	if (event->body) {
+		ei_x_encode_tuple_header(ebuf, 2);
+		ei_x_encode_string(ebuf, "body");
+		ei_x_encode_string(ebuf, event->body);
+	}
+
+	ei_x_encode_empty_list(ebuf);
+}
+
+
+void ei_encode_switch_event_tag(ei_x_buff *ebuf, switch_event_t *event, char *tag)
+{
+
+	ei_x_encode_tuple_header(ebuf, 2);
+	ei_x_encode_atom(ebuf, tag);
+	ei_encode_switch_event_headers(ebuf, event);
+}
+
+
+switch_status_t initialise_ei(struct ei_cnode_s *ec)
+{
+	switch_status_t rv;
+	struct sockaddr_in server_addr;
+
+	/* zero out the struct before we use it */
+	memset(&server_addr, 0, sizeof(server_addr));
+	
+	/* convert the configured IP to network byte order, handing errors */
+	rv = inet_pton(AF_INET, prefs.ip, &server_addr.sin_addr.s_addr);
+	if (rv == 0) {
+		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Could not parse invalid ip address: %s\n", prefs.ip);
+		return SWITCH_STATUS_FALSE;
+	} else if (rv == -1) {
+		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error when parsing ip address %s : %s\n", prefs.ip, strerror(errno));
+		return SWITCH_STATUS_FALSE;
+	}
+	
+	/* set the address family and port */
+	server_addr.sin_family = AF_INET;
+	server_addr.sin_port = htons(prefs.port);
+	
+	struct hostent *nodehost = gethostbyaddr(&server_addr.sin_addr.s_addr, sizeof(server_addr.sin_addr.s_addr), AF_INET);
+	
+	char *thishostname = nodehost->h_name;
+	char thisnodename[MAXNODELEN+1];
+	
+	if (!strcmp(thishostname, "localhost"))
+		gethostname(thishostname, EI_MAXHOSTNAMELEN);
+	
+	if (prefs.shortname) {
+		char *off;
+		if ((off = strchr(thishostname, '.'))) {
+			*off = '\0';
+		}
+	}
+	
+	snprintf(thisnodename, MAXNODELEN+1, "%s@%s", prefs.nodename, thishostname);
+	
+	/* init the ei stuff */
+	if (ei_connect_xinit(ec, thishostname, prefs.nodename, thisnodename, (Erl_IpAddr)(&server_addr.sin_addr.s_addr), prefs.cookie, 0) < 0) {
+		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to init ei connection\n");
+		return SWITCH_STATUS_FALSE;
+	}
+
+	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "ei initialized at %s\n", thisnodename);
+	return SWITCH_STATUS_SUCCESS;
+}
+
+/* For Emacs:
+ * Local Variables:
+ * mode:c
+ * indent-tabs-mode:t
+ * tab-width:4
+ * c-basic-offset:4
+ * End:
+ * For VIM:
+ * vim:set softtabstop=4 shiftwidth=4 tabstop=4:
+ */

Added: freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/handle_msg.c
==============================================================================
--- (empty file)
+++ freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/handle_msg.c	Thu Jan 22 13:34:53 2009
@@ -0,0 +1,713 @@
+/* 
+ * FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
+ * Copyright (C) 2005/2006, Anthony Minessale II <anthmct at yahoo.com>
+ *
+ * Version: MPL 1.1
+ *
+ * The contents of this file are subject to the Mozilla Public License Version
+ * 1.1 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS" basis,
+ * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+ * for the specific language governing rights and limitations under the
+ * License.
+ *
+ * The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
+ *
+ * The Initial Developer of the Original Code is
+ * Anthony Minessale II <anthmct at yahoo.com>
+ * Portions created by the Initial Developer are Copyright (C)
+ * the Initial Developer. All Rights Reserved.
+ *
+ * Contributor(s):
+ * 
+ * Anthony Minessale II <anthmct at yahoo.com>
+ * Andrew Thompson <andrew at hijacked.us>
+ * Rob Charlton <rob.charlton at savageminds.com>
+ *
+ *
+ * handle_msg.c -- handle messages received from erlang nodes
+ *
+ */
+#include <switch.h>
+#include <ei.h>
+#include "mod_erlang_event.h"
+
+static char *MARKER = "1";
+
+static void *SWITCH_THREAD_FUNC api_exec(switch_thread_t *thread, void *obj)
+{
+	switch_bool_t r = SWITCH_TRUE;
+	struct api_command_struct *acs = (struct api_command_struct *) obj;
+	switch_stream_handle_t stream = { 0 };
+	char *reply, *freply = NULL;
+	switch_status_t status;
+
+	if (!acs) {
+		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Internal error.\n");
+		return NULL;
+	}
+
+	if (!acs->listener || !acs->listener->rwlock || switch_thread_rwlock_tryrdlock(acs->listener->rwlock) != SWITCH_STATUS_SUCCESS) {
+		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error! cannot get read lock.\n");
+		goto done;
+	}
+
+	SWITCH_STANDARD_STREAM(stream);
+
+	if ((status = switch_api_execute(acs->api_cmd, acs->arg, NULL, &stream)) == SWITCH_STATUS_SUCCESS) {
+		reply = stream.data;
+	} else {
+		freply = switch_mprintf("%s: Command not found!\n", acs->api_cmd);
+		reply = freply;
+		r = SWITCH_FALSE;
+	}
+
+	if (!reply) {
+		reply = "Command returned no output!";
+		r = SWITCH_FALSE;
+	}
+
+	if (*reply == '-')
+		r = SWITCH_FALSE;
+
+	if (acs->bg) {
+		switch_event_t *event;
+
+		if (switch_event_create(&event, SWITCH_EVENT_BACKGROUND_JOB) == SWITCH_STATUS_SUCCESS) {
+			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Job-UUID", acs->uuid_str);
+			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Job-Command", acs->api_cmd);
+
+			ei_x_buff ebuf;
+			ei_x_new_with_version(&ebuf);
+
+			if (acs->arg) {
+				switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Job-Command-Arg", acs->arg);
+			}
+
+			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Job-Successful", r ? "true" : "false");
+			switch_event_add_body(event, "%s", reply);
+
+			switch_event_fire(&event);
+
+			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Sending bgapi reply to %s\n", acs->pid.node);
+
+			ei_x_encode_tuple_header(&ebuf, 3);
+
+			if (r)
+				ei_x_encode_atom(&ebuf, "bgok");
+			else
+				ei_x_encode_atom(&ebuf, "bgerror");
+
+			ei_x_encode_string(&ebuf, acs->uuid_str);
+			ei_x_encode_string(&ebuf, reply);
+
+			switch_mutex_lock(acs->listener->sock_mutex);
+			ei_send(acs->listener->sockfd, &acs->pid, ebuf.buff, ebuf.index);
+			switch_mutex_unlock(acs->listener->sock_mutex);
+
+			ei_x_free(&ebuf);
+		}
+	} else {
+		ei_x_buff rbuf;
+		ei_x_new_with_version(&rbuf);
+		ei_x_encode_tuple_header(&rbuf, 2);
+
+		if (!strlen(reply)) {
+			reply = "Command returned no output!";
+			r = SWITCH_FALSE;
+		}
+
+		if (r) {
+			ei_x_encode_atom(&rbuf, "ok");
+		} else {
+			ei_x_encode_atom(&rbuf, "error");
+		}
+		
+		ei_x_encode_string(&rbuf, reply);
+
+
+		switch_mutex_lock(acs->listener->sock_mutex);
+		ei_send(acs->listener->sockfd, &acs->pid, rbuf.buff, rbuf.index);
+		switch_mutex_unlock(acs->listener->sock_mutex);
+
+		ei_x_free(&rbuf);
+	}
+
+	switch_safe_free(stream.data);
+	switch_safe_free(freply);
+
+	if (acs->listener->rwlock) {
+		switch_thread_rwlock_unlock(acs->listener->rwlock);
+	}
+
+  done:
+	if (acs->bg) {
+		switch_memory_pool_t *pool = acs->pool;
+		acs = NULL;
+		switch_core_destroy_memory_pool(&pool);
+		pool = NULL;
+	}
+	return NULL;
+
+}
+
+static switch_status_t  handle_msg_fetch_reply(listener_t *listener, ei_x_buff *buf, ei_x_buff *rbuf)
+{
+	char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1];
+
+	if (ei_decode_string(buf->buff, &buf->index, uuid_str)) {
+		ei_x_encode_tuple_header(rbuf, 2);
+		ei_x_encode_atom(rbuf, "error");
+		ei_x_encode_atom(rbuf, "badarg");
+	}
+	else {
+		ei_x_buff *nbuf = switch_core_alloc(listener->pool, sizeof(nbuf));
+		nbuf->buff = switch_core_alloc(listener->pool, buf->buffsz);
+		memcpy(nbuf->buff, buf->buff, buf->buffsz);
+		nbuf->index = buf->index;
+		nbuf->buffsz = buf->buffsz;
+	
+		switch_core_hash_insert(listener->fetch_reply_hash, uuid_str, nbuf);
+		ei_x_encode_atom(rbuf, "ok");
+	}
+	return SWITCH_STATUS_SUCCESS;
+}
+
+static switch_status_t handle_msg_set_log_level(listener_t *listener, int arity, ei_x_buff *buf, ei_x_buff *rbuf)
+{
+	switch_log_level_t ltype = SWITCH_LOG_DEBUG;
+	char loglevelstr[MAXATOMLEN];
+	if (arity != 2 ||
+		ei_decode_atom(buf->buff, &buf->index, loglevelstr)) {
+		ei_x_encode_tuple_header(rbuf, 2);
+		ei_x_encode_atom(rbuf, "error");
+		ei_x_encode_atom(rbuf, "badarg");
+	}
+	else {
+		ltype = switch_log_str2level(loglevelstr);
+		
+		if (ltype && ltype != SWITCH_LOG_INVALID) {
+			listener->level = ltype;
+			ei_x_encode_atom(rbuf, "ok");
+		} else {
+			ei_x_encode_tuple_header(rbuf, 2);
+			ei_x_encode_atom(rbuf, "error");
+			ei_x_encode_atom(rbuf, "badarg");
+		}
+	}
+	return SWITCH_STATUS_SUCCESS;
+}
+
+static switch_status_t handle_msg_event(listener_t *listener, int arity, ei_x_buff *buf, ei_x_buff *rbuf)
+{
+	char atom[MAXATOMLEN];
+
+	if (arity == 1) {
+		ei_x_encode_tuple_header(rbuf, 2);
+		ei_x_encode_atom(rbuf, "error");
+		ei_x_encode_atom(rbuf, "badarg");
+	}
+	else {
+		int custom = 0;
+		switch_event_types_t type;
+		
+		if (!switch_test_flag(listener, LFLAG_EVENTS)) {
+			switch_set_flag_locked(listener, LFLAG_EVENTS);
+		}
+		
+		for (int i = 1; i < arity; i++) {
+			if (!ei_decode_atom(buf->buff, &buf->index, atom)) {
+				
+				if (custom) {
+					switch_core_hash_insert(listener->event_hash, atom, MARKER);
+				} else if (switch_name_event(atom, &type) == SWITCH_STATUS_SUCCESS) {
+					if (type == SWITCH_EVENT_ALL) {
+						switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "ALL events enabled\n");
+						uint32_t x = 0;
+						for (x = 0; x < SWITCH_EVENT_ALL; x++) {
+							listener->event_list[x] = 1;
+						}
+					}
+					if (type <= SWITCH_EVENT_ALL) {
+						listener->event_list[type] = 1;
+					}
+					if (type == SWITCH_EVENT_CUSTOM) {
+						custom++;
+					}
+					
+				}
+				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "enable event %s\n", atom);
+			}
+		}
+		ei_x_encode_atom(rbuf, "ok");
+	}
+	return SWITCH_STATUS_SUCCESS;
+}
+
+static switch_status_t handle_msg_nixevent(listener_t *listener, int arity, ei_x_buff *buf, ei_x_buff *rbuf)
+{
+	char atom[MAXATOMLEN];
+
+	if (arity == 1) {
+		ei_x_encode_tuple_header(rbuf, 2);
+		ei_x_encode_atom(rbuf, "error");
+		ei_x_encode_atom(rbuf, "badarg");
+	}
+	else {
+		int custom = 0;
+		switch_event_types_t type;
+		
+		for (int i = 1; i < arity; i++) {
+			if (!ei_decode_atom(buf->buff, &buf->index, atom)) {
+				
+				if (custom) {
+					switch_core_hash_delete(listener->event_hash, atom);
+				} else if (switch_name_event(atom, &type) == SWITCH_STATUS_SUCCESS) {
+					uint32_t x = 0;
+					
+					if (type == SWITCH_EVENT_CUSTOM) {
+						custom++;
+					} else if (type == SWITCH_EVENT_ALL) {
+						for (x = 0; x <= SWITCH_EVENT_ALL; x++) {
+							listener->event_list[x] = 0;
+						}
+					} else {
+						if (listener->event_list[SWITCH_EVENT_ALL]) {
+							listener->event_list[SWITCH_EVENT_ALL] = 0;
+							for (x = 0; x < SWITCH_EVENT_ALL; x++) {
+								listener->event_list[x] = 1;
+							}
+						}
+						listener->event_list[type] = 0;
+					}
+				}
+			}
+		}
+		ei_x_encode_atom(rbuf, "ok");
+	}
+	return SWITCH_STATUS_SUCCESS;
+}
+
+static switch_status_t handle_msg_api(listener_t *listener, erlang_msg *msg, int arity, ei_x_buff *buf, ei_x_buff *rbuf)
+{
+	char api_cmd[MAXATOMLEN];
+	char arg[1024];
+	if (arity < 3 ||
+		ei_decode_atom(buf->buff, &buf->index, api_cmd) ||
+		ei_decode_string(buf->buff, &buf->index, arg)) {
+		ei_x_encode_tuple_header(rbuf, 2);
+		ei_x_encode_atom(rbuf, "error");
+		ei_x_encode_atom(rbuf, "badarg");
+		return SWITCH_STATUS_SUCCESS;
+	}
+	else {
+		struct api_command_struct acs = { 0 };
+		acs.listener = listener;
+		acs.api_cmd = api_cmd;
+		acs.arg = arg;
+		acs.bg = 0;
+		acs.pid = msg->from;
+		api_exec(NULL, (void *) &acs);
+		/* don't reply */
+		return SWITCH_STATUS_FALSE;
+	}
+}
+
+static switch_status_t handle_msg_bgapi(listener_t *listener, erlang_msg *msg, int arity, ei_x_buff *buf, ei_x_buff *rbuf)
+{
+	char api_cmd[MAXATOMLEN];
+	char arg[1024];
+	if (arity < 3 ||
+		ei_decode_atom(buf->buff, &buf->index, api_cmd) ||
+		ei_decode_string(buf->buff, &buf->index, arg)) {
+		ei_x_encode_tuple_header(rbuf, 2);
+		ei_x_encode_atom(rbuf, "error");
+		ei_x_encode_atom(rbuf, "badarg");
+	}
+	else {
+		struct api_command_struct *acs = NULL;
+		switch_memory_pool_t *pool;
+		switch_thread_t *thread;
+		switch_threadattr_t *thd_attr = NULL;
+		switch_uuid_t uuid;
+		
+		switch_core_new_memory_pool(&pool);
+		acs = switch_core_alloc(pool, sizeof(*acs));
+		switch_assert(acs);
+		acs->pool = pool;
+		acs->listener = listener;
+		acs->api_cmd = switch_core_strdup(acs->pool, api_cmd);
+		acs->arg = switch_core_strdup(acs->pool, arg);
+		acs->bg = 1;
+		acs->pid = msg->from;
+		
+		switch_threadattr_create(&thd_attr, acs->pool);
+		switch_threadattr_detach_set(thd_attr, 1);
+		switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
+		
+		switch_uuid_get(&uuid);
+		switch_uuid_format(acs->uuid_str, &uuid);
+		switch_thread_create(&thread, thd_attr, api_exec, acs, acs->pool);
+		
+		ei_x_encode_tuple_header(rbuf, 2);
+		ei_x_encode_atom(rbuf, "ok");
+		ei_x_encode_string(rbuf, acs->uuid_str);
+	}
+	return SWITCH_STATUS_SUCCESS;
+}
+
+static switch_status_t handle_msg_sendevent(listener_t *listener, int arity, ei_x_buff *buf, ei_x_buff *rbuf)
+{
+	char ename[MAXATOMLEN];
+	int headerlength;
+	
+	if (ei_decode_atom(buf->buff, &buf->index, ename) ||
+		ei_decode_list_header(buf->buff, &buf->index, &headerlength)) {
+		ei_x_encode_tuple_header(rbuf, 2);
+		ei_x_encode_atom(rbuf, "error");
+		ei_x_encode_atom(rbuf, "badarg");
+	}
+	else {
+		switch_event_types_t etype;
+		if (switch_name_event(ename, &etype) == SWITCH_STATUS_SUCCESS) {
+			switch_event_t *event;
+			if (switch_event_create(&event, etype) == SWITCH_STATUS_SUCCESS) {
+				char key[1024];
+				char value[1024];
+				int i = 0;
+				switch_bool_t fail = SWITCH_FALSE;
+
+				while(!ei_decode_tuple_header(buf->buff, &buf->index, &arity) && arity == 2) {
+					i++;
+					if (ei_decode_string(buf->buff, &buf->index, key) ||
+						ei_decode_string(buf->buff, &buf->index, value)) {
+						fail = SWITCH_TRUE;
+						break;
+					}
+					
+					switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, key, value);
+				}
+				
+				if (headerlength != i || fail) {
+					ei_x_encode_tuple_header(rbuf, 2);
+					ei_x_encode_atom(rbuf, "error");
+					ei_x_encode_atom(rbuf, "badarg");
+				}
+				else {
+					switch_event_fire(&event);
+					ei_x_encode_atom(rbuf, "ok");
+				}
+			}
+		}
+	}
+	return SWITCH_STATUS_SUCCESS;
+}
+
+static switch_status_t handle_msg_sendmsg(listener_t *listener, int arity, ei_x_buff *buf, ei_x_buff *rbuf)
+{
+	char uuid[37];
+	int headerlength;
+			
+	if (ei_decode_string(buf->buff, &buf->index, uuid) ||
+		ei_decode_list_header(buf->buff, &buf->index, &headerlength)) {
+		ei_x_encode_tuple_header(rbuf, 2);
+		ei_x_encode_atom(rbuf, "error");
+		ei_x_encode_atom(rbuf, "badarg");
+	}
+	else {
+		switch_core_session_t *session;
+		if (!switch_strlen_zero(uuid) && (session = switch_core_session_locate(uuid))) {
+			switch_event_t *event;	  
+			if (switch_event_create(&event, SWITCH_EVENT_SEND_MESSAGE) == SWITCH_STATUS_SUCCESS) {
+				
+				char key[1024];
+				char value[1024];
+				int i = 0;
+				switch_bool_t fail = SWITCH_FALSE;
+				while(!ei_decode_tuple_header(buf->buff, &buf->index, &arity) && arity == 2) {
+					i++;
+					if (ei_decode_string(buf->buff, &buf->index, key) ||
+						ei_decode_string(buf->buff, &buf->index, value)) {
+						fail = SWITCH_TRUE;
+						break;
+					}					
+					switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, key, value);
+				}
+				
+				if (headerlength != i || fail) {
+					ei_x_encode_tuple_header(rbuf, 2);
+					ei_x_encode_atom(rbuf, "error");
+					ei_x_encode_atom(rbuf, "badarg");
+				}
+				else {
+					if (switch_core_session_queue_private_event(session, &event) == SWITCH_STATUS_SUCCESS) {
+						ei_x_encode_atom(rbuf, "ok");
+					} else {
+						ei_x_encode_tuple_header(rbuf, 2);
+						ei_x_encode_atom(rbuf, "error");
+						ei_x_encode_atom(rbuf, "badmem");
+					}
+					
+				}
+			}
+			/* release the lock returned by switch_core_locate_session */
+			switch_core_session_rwunlock(session);
+
+		} else {
+			ei_x_encode_tuple_header(rbuf, 2);
+			ei_x_encode_atom(rbuf, "error");
+			ei_x_encode_atom(rbuf, "nosession");
+		}
+	}
+	return SWITCH_STATUS_SUCCESS;
+}
+
+static switch_status_t handle_msg_bind(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff *rbuf)
+{
+	/* format is (result|config|directory|dialplan|phrases)  */
+	char sectionstr[MAXATOMLEN];
+	switch_xml_section_t section;
+
+	if (ei_decode_atom(buf->buff, &buf->index, sectionstr) ||
+		!(section = switch_xml_parse_section_string(sectionstr))) {
+		ei_x_encode_tuple_header(rbuf, 2);
+		ei_x_encode_atom(rbuf, "error");
+		ei_x_encode_atom(rbuf, "badarg");
+	}
+	else {
+		struct erlang_binding *binding, *ptr;
+
+		if (!(binding = switch_core_alloc(listener->pool, sizeof(*binding)))) {
+			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error\n");
+			ei_x_encode_tuple_header(rbuf, 2);
+			ei_x_encode_atom(rbuf, "error");
+			ei_x_encode_atom(rbuf, "badmem");
+		}
+		else {
+			binding->section = section;
+			binding->pid = msg->from;
+			binding->listener = listener;
+
+			switch_core_hash_init(&listener->fetch_reply_hash, listener->pool);
+
+			switch_mutex_lock(globals.listener_mutex);
+
+			for (ptr = bindings.head; ptr && ptr->next; ptr = ptr->next);
+
+			if (ptr) {
+				ptr->next = binding;
+			} else {
+				bindings.head = binding;
+			}
+			
+			switch_xml_set_binding_sections(bindings.search_binding, switch_xml_get_binding_sections(bindings.search_binding) | section);
+			switch_mutex_unlock(globals.listener_mutex);
+
+			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "sections %d\n", switch_xml_get_binding_sections(bindings.search_binding));
+
+			ei_link(listener, ei_self(listener->ec), &msg->from);
+			ei_x_encode_atom(rbuf, "ok");
+		}
+	}
+	return SWITCH_STATUS_SUCCESS;
+}
+
+/* {handlecall,<uuid>,<handler process registered name>} */
+static switch_status_t handle_msg_handlecall(listener_t *listener, int arity, ei_x_buff *buf, ei_x_buff *rbuf)
+{
+	char reg_name[MAXATOMLEN];
+	char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1];
+
+	if (arity != 3 ||
+		ei_decode_string(buf->buff, &buf->index, uuid_str) ||
+		ei_decode_string(buf->buff, &buf->index, reg_name)) {
+		ei_x_encode_tuple_header(rbuf, 2);
+		ei_x_encode_atom(rbuf, "error");
+		ei_x_encode_atom(rbuf, "badarg");
+	}
+	else {
+		switch_core_session_t *session;
+		if (!switch_strlen_zero(uuid_str) && (session = switch_core_session_locate(uuid_str))) {
+			/* create a new sesion list element and attach it to this listener */
+			if (attach_call_to_listener(listener,reg_name,session)) {
+				ei_x_encode_atom(rbuf, "ok");
+			}
+			else {
+				ei_x_encode_tuple_header(rbuf, 2);
+				ei_x_encode_atom(rbuf, "error");
+				ei_x_encode_atom(rbuf, "badsession");
+			}
+		}
+		else {
+			ei_x_encode_tuple_header(rbuf, 2);
+			ei_x_encode_atom(rbuf, "error");
+			ei_x_encode_atom(rbuf, "badarg");
+		}
+	}
+	return SWITCH_STATUS_SUCCESS;
+}
+
+static switch_status_t handle_msg_tuple(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff *rbuf)
+{
+	char tupletag[MAXATOMLEN];
+	int arity;
+	switch_status_t ret = SWITCH_STATUS_SUCCESS;
+
+	ei_decode_tuple_header(buf->buff, &buf->index, &arity);
+	if (ei_decode_atom(buf->buff, &buf->index, tupletag)) {
+		ei_x_encode_tuple_header(rbuf, 2);
+		ei_x_encode_atom(rbuf, "error");
+		ei_x_encode_atom(rbuf, "badarg");
+	}
+	else {
+		if (!strncmp(tupletag, "fetch_reply", MAXATOMLEN)) {
+			ret = handle_msg_fetch_reply(listener,buf,rbuf);
+		} else if (!strncmp(tupletag, "set_log_level", MAXATOMLEN)) {
+			ret = handle_msg_set_log_level(listener,arity,buf,rbuf);
+		} else if (!strncmp(tupletag, "event", MAXATOMLEN)) {
+			ret = handle_msg_event(listener,arity,buf,rbuf);
+		} else if (!strncmp(tupletag, "nixevent", MAXATOMLEN)) {
+			ret = handle_msg_nixevent(listener,arity,buf,rbuf);
+		} else if (!strncmp(tupletag, "api", MAXATOMLEN)) {
+			ret = handle_msg_api(listener,msg,arity,buf,rbuf);
+		} else if (!strncmp(tupletag, "bgapi", MAXATOMLEN)) {
+			ret = handle_msg_bgapi(listener,msg,arity,buf,rbuf);
+		} else if (!strncmp(tupletag, "sendevent", MAXATOMLEN)) {
+			ret = handle_msg_sendevent(listener,arity,buf,rbuf);
+		} else if (!strncmp(tupletag, "sendmsg", MAXATOMLEN)) {
+			ret = handle_msg_sendmsg(listener,arity,buf,rbuf);
+		} else if (!strncmp(tupletag, "bind", MAXATOMLEN)) {
+			ret = handle_msg_bind(listener,msg,buf,rbuf);
+		} else if (!strncmp(tupletag, "handlecall", MAXATOMLEN)) {
+			ret = handle_msg_handlecall(listener,arity,buf,rbuf);
+		} else {
+			ei_x_encode_tuple_header(rbuf, 2);
+			ei_x_encode_atom(rbuf, "error");
+			ei_x_encode_atom(rbuf, "undef");
+		}
+	}
+	return ret;
+}
+
+static switch_status_t  handle_msg_atom(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff *rbuf)
+{
+	char atom[MAXATOMLEN];
+	switch_status_t ret = SWITCH_STATUS_SUCCESS;
+
+	if (ei_decode_atom(buf->buff, &buf->index, atom)) {
+		ei_x_encode_tuple_header(rbuf, 2);
+		ei_x_encode_atom(rbuf, "error");
+		ei_x_encode_atom(rbuf, "badarg");
+	}
+	else if (!strncmp(atom, "nolog", MAXATOMLEN)) {
+		if (switch_test_flag(listener, LFLAG_LOG)) {
+			switch_clear_flag_locked(listener, LFLAG_LOG);
+		}
+		ei_x_encode_atom(rbuf, "ok");
+	} else if (!strncmp(atom, "register_log_handler", MAXATOMLEN)) {
+		ei_link(listener, ei_self(listener->ec), &msg->from);
+		listener->log_pid = msg->from;
+		listener->level = SWITCH_LOG_DEBUG;
+		switch_set_flag(listener, LFLAG_LOG);
+		ei_x_encode_atom(rbuf, "ok");
+	} else if (!strncmp(atom, "register_event_handler", MAXATOMLEN)) {
+		ei_link(listener, ei_self(listener->ec), &msg->from);
+		listener->event_pid = msg->from;
+		if (!switch_test_flag(listener, LFLAG_EVENTS)) {
+			switch_set_flag_locked(listener, LFLAG_EVENTS);
+		}
+		ei_x_encode_atom(rbuf, "ok");
+	} else if (!strncmp(atom, "noevents", MAXATOMLEN)) {
+		void *pop;
+		/*purge the event queue */
+		while (switch_queue_trypop(listener->event_queue, &pop) == SWITCH_STATUS_SUCCESS);
+		
+		if (switch_test_flag(listener, LFLAG_EVENTS)) {
+			uint8_t x = 0;
+			switch_clear_flag_locked(listener, LFLAG_EVENTS);
+			for (x = 0; x <= SWITCH_EVENT_ALL; x++) {
+				listener->event_list[x] = 0;
+			}
+			/* wipe the hash */
+			switch_core_hash_destroy(&listener->event_hash);
+			switch_core_hash_init(&listener->event_hash, listener->pool);
+			ei_x_encode_atom(rbuf, "ok");
+		} else {
+			ei_x_encode_tuple_header(rbuf, 2);
+			ei_x_encode_atom(rbuf, "error");
+			ei_x_encode_atom(rbuf, "notlistening");
+		}
+	} else if (!strncmp(atom, "exit", MAXATOMLEN)) {
+		ei_x_encode_atom(rbuf, "ok");
+		ret = SWITCH_STATUS_TERM;
+	} else if (!strncmp(atom, "getpid", MAXATOMLEN)) {
+		ei_x_encode_tuple_header(rbuf, 2);
+		ei_x_encode_atom(rbuf, "ok");
+		ei_x_encode_pid(rbuf, ei_self(listener->ec));
+	} else if (!strncmp(atom, "link", MAXATOMLEN)) {
+		/* debugging */
+		ei_link(listener, ei_self(listener->ec), &msg->from);
+		ret = SWITCH_STATUS_FALSE;
+	} else {
+		ei_x_encode_tuple_header(rbuf, 2);
+		ei_x_encode_atom(rbuf, "error");
+		ei_x_encode_atom(rbuf, "undef");
+	}
+	
+	return ret;
+}
+
+int handle_msg(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff *rbuf)
+{
+	int type, size, version;
+	switch_status_t ret = SWITCH_STATUS_SUCCESS;
+
+	buf->index = 0;
+	ei_decode_version(buf->buff, &buf->index, &version);
+	ei_get_type(buf->buff, &buf->index, &type, &size);
+
+	switch(type) {
+	case ERL_SMALL_TUPLE_EXT :
+	case ERL_LARGE_TUPLE_EXT :
+		ret = handle_msg_tuple(listener,msg,buf,rbuf);
+		break;
+						 
+	case ERL_ATOM_EXT :
+		ret = handle_msg_atom(listener,msg,buf,rbuf);
+		break;
+
+	default :
+		/* some other kind of erlang term */
+		ei_x_encode_tuple_header(rbuf, 2);
+		ei_x_encode_atom(rbuf, "error");
+		ei_x_encode_atom(rbuf, "undef");
+		break;
+	}
+
+	if (SWITCH_STATUS_FALSE==ret)
+		return 0;
+	else {
+		switch_mutex_lock(listener->sock_mutex);
+		ei_send(listener->sockfd, &msg->from, rbuf->buff, rbuf->index);
+		switch_mutex_unlock(listener->sock_mutex);
+
+		if (SWITCH_STATUS_SUCCESS==ret)
+			return 0;
+		else /* SWITCH_STATUS_TERM */
+			return 1;
+	}
+}
+
+/* For Emacs:
+ * Local Variables:
+ * mode:c
+ * indent-tabs-mode:t
+ * tab-width:4
+ * c-basic-offset:4
+ * End:
+ * For VIM:
+ * vim:set softtabstop=4 shiftwidth=4 tabstop=4:
+ */

Modified: freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c
==============================================================================
--- freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c	(original)
+++ freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c	Thu Jan 22 13:34:53 2009
@@ -25,101 +25,22 @@
  * 
  * Anthony Minessale II <anthmct at yahoo.com>
  * Andrew Thompson <andrew at hijacked.us>
+ * Rob Charlton <rob.charlton at savageminds.com>
  *
  *
  * mod_erlang_event.c -- Erlang Event Handler derived from mod_event_socket
  *
  */
 #include <switch.h>
-
 #include <ei.h>
+#define DEFINE_GLOBALS
+#include "mod_erlang_event.h"
 
 SWITCH_MODULE_LOAD_FUNCTION(mod_erlang_event_load);
 SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_erlang_event_shutdown);
 SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime);
 SWITCH_MODULE_DEFINITION(mod_erlang_event, mod_erlang_event_load, mod_erlang_event_shutdown, mod_erlang_event_runtime);
 
-static char *MARKER = "1";
-
-typedef enum {
-	LFLAG_AUTHED = (1 << 0),
-	LFLAG_RUNNING = (1 << 1),
-	LFLAG_EVENTS = (1 << 2),
-	LFLAG_LOG = (1 << 3),
-	LFLAG_FULL = (1 << 4),
-	LFLAG_MYEVENTS = (1 << 5),
-	LFLAG_SESSION = (1 << 6),
-	LFLAG_ASYNC = (1 << 7),
-	LFLAG_STATEFUL = (1 << 8)
-} event_flag_t;
-
-struct listener {
-	int sockfd;
-	struct ei_cnode_s *ec;
-	erlang_pid log_pid;
-	erlang_pid event_pid;
-	switch_queue_t *event_queue;
-	switch_queue_t *log_queue;
-	switch_memory_pool_t *pool;
-	switch_mutex_t *flag_mutex;
-	switch_mutex_t *sock_mutex;
-	char *ebuf;
-	uint32_t flags;
-	switch_log_level_t level;
-	uint8_t event_list[SWITCH_EVENT_ALL + 1];
-	switch_hash_t *event_hash;
-	switch_hash_t *fetch_reply_hash;
-	switch_thread_rwlock_t *rwlock;
-	switch_core_session_t *session;
-	int lost_events;
-	int lost_logs;
-	time_t last_flush;
-	uint32_t timeout;
-	uint32_t id;
-	char remote_ip[50];
-	/*switch_port_t remote_port;*/
-	struct listener *next;
-};
-
-typedef struct listener listener_t;
-
-static struct {
-	int sockfd;
-	switch_mutex_t *sock_mutex;
-	listener_t *listeners;
-	uint8_t ready;
-} listen_list;
-
-#define MAX_ACL 100
-
-struct erlang_binding {
-	switch_xml_section_t section;
-	erlang_pid pid;
-	char *registered_process; /* TODO */
-	listener_t *listener;
-	struct erlang_binding *next;
-};
-
-static struct {
-	struct erlang_binding *head;
-	switch_xml_binding_t *search_binding;
-} bindings;
-
-static struct {
-	switch_mutex_t *mutex;
-	char *ip;
-	char *nodename;
-	switch_bool_t shortname;
-	uint16_t port;
-	char *cookie;
-	int done;
-	int threads;
-	char *acl[MAX_ACL];
-	uint32_t acl_count;
-	uint32_t id;
-} prefs;
-
-
 static void remove_listener(listener_t *listener);
 
 SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_pref_ip, prefs.ip);
@@ -129,12 +50,6 @@
 static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj);
 static void launch_listener_thread(listener_t *listener);
 
-static struct {
-	switch_mutex_t *listener_mutex;
-	switch_event_node_t *node;
-} globals;
-
-
 static switch_status_t socket_logger(const switch_log_node_t *node, switch_log_level_t level)
 {
 	listener_t *l;
@@ -173,49 +88,6 @@
 }
 
 
-/* Stolen from code added to ei in R12B-5.
- * Since not everyone has this verison yet;
- * provide our own version. 
- * */
-
-#define put8(s,n) do { \
-	  (s)[0] = (char)((n) & 0xff); \
-	  (s) += 1; \
-} while (0)
-
-#define put32be(s,n) do {  \
-	  (s)[0] = ((n) >>  24) & 0xff; \
-	  (s)[1] = ((n) >>  16) & 0xff; \
-	  (s)[2] = ((n) >>  8) & 0xff;  \
-	  (s)[3] = (n) & 0xff; \
-	  (s) += 4; \
-} while (0)
-
-static void ei_link(listener_t *listener, erlang_pid *from, erlang_pid *to) {
-	char msgbuf[2048];
-	char *s;
-	int index = 0;
-	/*int n;*/
-
-	index = 5;                                     /* max sizes: */
-	ei_encode_version(msgbuf,&index);                     /*   1 */
-	ei_encode_tuple_header(msgbuf,&index,3);
-	ei_encode_long(msgbuf,&index,ERL_LINK);
-	ei_encode_pid(msgbuf,&index,from);                    /* 268 */
-	ei_encode_pid(msgbuf,&index,to);                      /* 268 */
-
-	/* 5 byte header missing */
-	s = msgbuf;
-	put32be(s, index - 4);                                /*   4 */
-	put8(s, ERL_PASS_THROUGH);                            /*   1 */
-	/* sum:  542 */
-
-	switch_mutex_lock(listener->sock_mutex);
-	write(listener->sockfd, msgbuf, index);
-	switch_mutex_unlock(listener->sock_mutex);
-}
-
-
 static void expire_listener(listener_t **listener)
 {
 	void *pop;
@@ -268,54 +140,33 @@
 }
 
 
-static void ei_encode_switch_event_headers(ei_x_buff *ebuf, switch_event_t *event)
+static void send_event_to_attached_sessions(listener_t* listener, switch_event_t *event)
 {
-	int i;
 	char *uuid = switch_event_get_header(event, "unique-id");
+	switch_event_t *clone = NULL;
+	session_elem_t* s;
 
-	switch_event_header_t *hp;
-
-	for (i = 0, hp = event->headers; hp; hp = hp->next, i++);
-
-	if (event->body)
-		i++;
-
-	ei_x_encode_list_header(ebuf, i+1);
-
-	if (uuid) {
-		ei_x_encode_string(ebuf, switch_event_get_header(event, "unique-id"));
-	} else {
-		ei_x_encode_atom(ebuf, "undefined");
-	}
-
-	for (hp = event->headers; hp; hp = hp->next) {
-		ei_x_encode_tuple_header(ebuf, 2);
-		ei_x_encode_string(ebuf, hp->name);
-		ei_x_encode_string(ebuf, hp->value);
-	}
-
-	if (event->body) {
-		ei_x_encode_tuple_header(ebuf, 2);
-		ei_x_encode_string(ebuf, "body");
-		ei_x_encode_string(ebuf, event->body);
+	if (!uuid)
+		return;
+	switch_mutex_lock(listener->session_mutex);
+	for (s = listener->session_list; s; s = s->next) {
+		/* check the event uuid against the uuid of each session */
+		if (!strcmp(uuid, switch_core_session_get_uuid(s->session))) {
+			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Sending event to attached session\n");
+			if (switch_event_dup(&clone, event) == SWITCH_STATUS_SUCCESS) {
+				/* add the event to the queue for this session */
+				if (switch_queue_trypush(s->event_queue, clone) != SWITCH_STATUS_SUCCESS) {
+					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Lost event!\n");
+					switch_event_destroy(&clone);
+				}
+			} else {
+				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error!\n");
+			}
+		}
 	}
-
-	ei_x_encode_empty_list(ebuf);
-}
-
-
-static void ei_encode_switch_event_tag(ei_x_buff *ebuf, switch_event_t *event, char *tag)
-{
-
-	ei_x_encode_tuple_header(ebuf, 2);
-	ei_x_encode_atom(ebuf, tag);
-	ei_encode_switch_event_headers(ebuf, event);
+	switch_mutex_unlock(listener->session_mutex);
 }
 
-
-#define ei_encode_switch_event(_b, _e) ei_encode_switch_event_tag(_b, _e, "event")
-
-
 static void event_handler(switch_event_t *event)
 {
 	switch_event_t *clone = NULL;
@@ -335,7 +186,12 @@
 		
 		l = lp;
 		lp = lp->next;
-		
+
+		/* test all of the sessions attached to this event in case
+		   one of them should receive it as well
+		 */
+		send_event_to_attached_sessions(l,event);
+
 		if (!switch_test_flag(l, LFLAG_EVENTS)) {
 			continue;
 		}
@@ -355,13 +211,6 @@
 			}
 		}
 		
-		if (send && switch_test_flag(l, LFLAG_MYEVENTS)) {
-			char *uuid = switch_event_get_header(event, "unique-id");
-			if (!uuid || strcmp(uuid, switch_core_session_get_uuid(l->session))) {
-				send = 0;
-			}
-		}
-
 		if (send) {
 			if (switch_event_dup(&clone, event) == SWITCH_STATUS_SUCCESS) {
 				if (switch_queue_trypush(l->event_queue, clone) == SWITCH_STATUS_SUCCESS) {
@@ -429,134 +278,27 @@
 	switch_mutex_unlock(globals.listener_mutex);
 }
 
+/* Search for a listener already talking to the specified node */
+static listener_t * find_listener(char* nodename)
+{
+	listener_t *l = NULL;
 
-struct api_command_struct {
-	char *api_cmd;
-	char *arg;
-	listener_t *listener;
-	char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1];
-	uint8_t bg;
-	erlang_pid pid;
-	switch_memory_pool_t *pool;
-};
-
-
-static void *SWITCH_THREAD_FUNC api_exec(switch_thread_t *thread, void *obj)
-{
-	switch_bool_t r = SWITCH_TRUE;
-	struct api_command_struct *acs = (struct api_command_struct *) obj;
-	switch_stream_handle_t stream = { 0 };
-	char *reply, *freply = NULL;
-	switch_status_t status;
-
-	if (!acs) {
-		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Internal error.\n");
-		return NULL;
-	}
-
-	if (!acs->listener || !acs->listener->rwlock || switch_thread_rwlock_tryrdlock(acs->listener->rwlock) != SWITCH_STATUS_SUCCESS) {
-		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error! cannot get read lock.\n");
-		goto done;
-	}
-
-
-	SWITCH_STANDARD_STREAM(stream);
-
-	if ((status = switch_api_execute(acs->api_cmd, acs->arg, NULL, &stream)) == SWITCH_STATUS_SUCCESS) {
-		reply = stream.data;
-	} else {
-		freply = switch_mprintf("%s: Command not found!\n", acs->api_cmd);
-		reply = freply;
-		r = SWITCH_FALSE;
-	}
-
-	if (!reply) {
-		reply = "Command returned no output!";
-		r = SWITCH_FALSE;
-	}
-
-	if (*reply == '-')
-		r = SWITCH_FALSE;
-
-	if (acs->bg) {
-		switch_event_t *event;
-
-		if (switch_event_create(&event, SWITCH_EVENT_BACKGROUND_JOB) == SWITCH_STATUS_SUCCESS) {
-			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Job-UUID", acs->uuid_str);
-			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Job-Command", acs->api_cmd);
-
-			ei_x_buff ebuf;
-			ei_x_new_with_version(&ebuf);
-
-			if (acs->arg) {
-				switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Job-Command-Arg", acs->arg);
-			}
-
-			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Job-Successful", r ? "true" : "false");
-			switch_event_add_body(event, "%s", reply);
-
-			switch_event_fire(&event);
-
-			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Sending bgapi reply to %s\n", acs->pid.node);
-
-			ei_x_encode_tuple_header(&ebuf, 3);
-
-			if (r)
-				ei_x_encode_atom(&ebuf, "bgok");
-			else
-				ei_x_encode_atom(&ebuf, "bgerror");
-
-			ei_x_encode_string(&ebuf, acs->uuid_str);
-			ei_x_encode_string(&ebuf, reply);
-
-			switch_mutex_lock(acs->listener->sock_mutex);
-			ei_send(acs->listener->sockfd, &acs->pid, ebuf.buff, ebuf.index);
-			switch_mutex_unlock(acs->listener->sock_mutex);
-
-			ei_x_free(&ebuf);
-		}
-	} else {
-		ei_x_buff rbuf;
-		ei_x_new_with_version(&rbuf);
-		ei_x_encode_tuple_header(&rbuf, 2);
-
-		if (!strlen(reply)) {
-			reply = "Command returned no output!";
-			r = SWITCH_FALSE;
-		}
-
-		if (r) {
-			ei_x_encode_atom(&rbuf, "ok");
-		} else {
-			ei_x_encode_atom(&rbuf, "error");
+	switch_mutex_lock(globals.listener_mutex);
+	for (l = listen_list.listeners; l; l = l->next) {
+		if (!strncmp(nodename, l->peer_nodename, MAXNODELEN)) {
+			break;
 		}
-		
-		ei_x_encode_string(&rbuf, reply);
-
-
-		switch_mutex_lock(acs->listener->sock_mutex);
-		ei_send(acs->listener->sockfd, &acs->pid, rbuf.buff, rbuf.index);
-		switch_mutex_unlock(acs->listener->sock_mutex);
-
-		ei_x_free(&rbuf);
 	}
+	switch_mutex_unlock(globals.listener_mutex);
+	return l;
+}
 
-	switch_safe_free(stream.data);
-	switch_safe_free(freply);
-
-	if (acs->listener->rwlock) {
-		switch_thread_rwlock_unlock(acs->listener->rwlock);
-	}
-
-  done:
-	if (acs->bg) {
-		switch_memory_pool_t *pool = acs->pool;
-		acs = NULL;
-		switch_core_destroy_memory_pool(&pool);
-		pool = NULL;
-	}
-	return NULL;
-
+static void add_session_elem_to_listener(listener_t *listener, session_elem_t *session_element)
+{
+	switch_mutex_lock(listener->session_mutex);
+	session_element->next = listener->session_list;
+	listener->session_list = session_element;
+	switch_mutex_unlock(listener->session_mutex);
 }
 
 static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, const char *key_name, const char *key_value,
@@ -650,571 +392,187 @@
 }
 
 
-static int handle_msg(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff *rbuf)
+static switch_status_t notify_new_session(listener_t *listener, switch_core_session_t *session, char* reg_name)
 {
-	int type, size, version, arity;
-	char tupletag[MAXATOMLEN];
-	char atom[MAXATOMLEN];
-
-	buf->index = 0;
-	ei_decode_version(buf->buff, &buf->index, &version);
-	ei_get_type(buf->buff, &buf->index, &type, &size);
-
-	switch(type) {
-	case ERL_SMALL_TUPLE_EXT :
-	case ERL_LARGE_TUPLE_EXT :
-		ei_decode_tuple_header(buf->buff, &buf->index, &arity);
-		if (ei_decode_atom(buf->buff, &buf->index, tupletag)) {
-			ei_x_encode_tuple_header(rbuf, 2);
-			ei_x_encode_atom(rbuf, "error");
-			ei_x_encode_atom(rbuf, "badarg");
-			break;
-		}
-
-		if (!strncmp(tupletag, "fetch_reply", MAXATOMLEN)) {
-			char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1];
-
-			if (ei_decode_string(buf->buff, &buf->index, uuid_str)) {
-				ei_x_encode_tuple_header(rbuf, 2);
-				ei_x_encode_atom(rbuf, "error");
-				ei_x_encode_atom(rbuf, "badarg");
-				break;
-			}
-
-			ei_x_buff *nbuf = switch_core_alloc(listener->pool, sizeof(nbuf));
-			/*char *wtf = "hello world";*/
-			nbuf->buff = switch_core_alloc(listener->pool, buf->buffsz);
-			memcpy(nbuf->buff, buf->buff, buf->buffsz);
-			/*memcpy(nbuf, wtf, 20);*/
-			nbuf->index = buf->index;
-			nbuf->buffsz = buf->buffsz;
-
-			/*switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "stored %d %d %s\n", buf->index, buf->buffsz, nbuf);*/
-
-			switch_core_hash_insert(listener->fetch_reply_hash, uuid_str, nbuf);
-
-		} else if (!strncmp(tupletag, "set_log_level", MAXATOMLEN)) {
-			if (arity == 2) {
-				switch_log_level_t ltype = SWITCH_LOG_DEBUG;
-				char loglevelstr[MAXATOMLEN];
-				if (ei_decode_atom(buf->buff, &buf->index, loglevelstr)) {
-					ei_x_encode_tuple_header(rbuf, 2);
-					ei_x_encode_atom(rbuf, "error");
-					ei_x_encode_atom(rbuf, "badarg");
-					break;
-				}
-				ltype = switch_log_str2level(loglevelstr);
-
-				if (ltype && ltype != SWITCH_LOG_INVALID) {
-					listener->level = ltype;
-				} else {
-					ei_x_encode_tuple_header(rbuf, 2);
-					ei_x_encode_atom(rbuf, "error");
-					ei_x_encode_atom(rbuf, "badarg");
-					break;
-				}
-			} else {
-				/* tuple too long */
-				ei_x_encode_tuple_header(rbuf, 2);
-				ei_x_encode_atom(rbuf, "error");
-				ei_x_encode_atom(rbuf, "badarg");
-				break;
-			}
-		} else if (!strncmp(tupletag, "event", MAXATOMLEN)) {
-			if (arity == 1) {
-				ei_x_encode_tuple_header(rbuf, 2);
-				ei_x_encode_atom(rbuf, "error");
-				ei_x_encode_atom(rbuf, "badarg");
-				break;
-			}
-
-			int custom = 0;
-			switch_event_types_t type;
-
-			if (!switch_test_flag(listener, LFLAG_EVENTS)) {
-				switch_set_flag_locked(listener, LFLAG_EVENTS);
-			}
-
-			for (int i = 1; i < arity; i++) {
-				if (!ei_decode_atom(buf->buff, &buf->index, atom)) {
-
-					if (custom) {
-						switch_core_hash_insert(listener->event_hash, atom, MARKER);
-					} else if (switch_name_event(atom, &type) == SWITCH_STATUS_SUCCESS) {
-						if (type == SWITCH_EVENT_ALL) {
-							switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "ALL events enabled\n");
-							uint32_t x = 0;
-							for (x = 0; x < SWITCH_EVENT_ALL; x++) {
-								listener->event_list[x] = 1;
-							}
-						}
-						if (type <= SWITCH_EVENT_ALL) {
-							listener->event_list[type] = 1;
-						}
-						if (type == SWITCH_EVENT_CUSTOM) {
-							custom++;
-						}
-
-					}
-					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "enable event %s\n", atom);
-				}
-			}
-		} else if (!strncmp(tupletag, "nixevent", MAXATOMLEN)) {
-			if (arity == 1) {
-				ei_x_encode_tuple_header(rbuf, 2);
-				ei_x_encode_atom(rbuf, "error");
-				ei_x_encode_atom(rbuf, "badarg");
-				break;
-			}
-
-			int custom = 0;
-			switch_event_types_t type;
+	switch_event_t *call_event=NULL;
+	switch_channel_t *channel=NULL;
 
-			for (int i = 1; i < arity; i++) {
-				if (!ei_decode_atom(buf->buff, &buf->index, atom)) {
-
-					if (custom) {
-						switch_core_hash_delete(listener->event_hash, atom);
-					} else if (switch_name_event(atom, &type) == SWITCH_STATUS_SUCCESS) {
-						uint32_t x = 0;
-
-						if (type == SWITCH_EVENT_CUSTOM) {
-							custom++;
-						} else if (type == SWITCH_EVENT_ALL) {
-							for (x = 0; x <= SWITCH_EVENT_ALL; x++) {
-								listener->event_list[x] = 0;
-							}
-						} else {
-							if (listener->event_list[SWITCH_EVENT_ALL]) {
-								listener->event_list[SWITCH_EVENT_ALL] = 0;
-								for (x = 0; x < SWITCH_EVENT_ALL; x++) {
-									listener->event_list[x] = 1;
-								}
-							}
-							listener->event_list[type] = 0;
-						}
-					}
-				}
-			}
-		} else if (!strncmp(tupletag, "api", MAXATOMLEN)) {
-			if (arity < 3) {
-				ei_x_encode_tuple_header(rbuf, 2);
-				ei_x_encode_atom(rbuf, "error");
-				ei_x_encode_atom(rbuf, "badarg");
-				break;
-			}
-
-			char api_cmd[MAXATOMLEN];
-			char arg[1024];
+	/* Send a message to the associated registered process to let it know there is a call.
+	   Message is a tuple of the form {call, <call-event>}
+	*/
+	channel = switch_core_session_get_channel(session);
+	if (switch_event_create(&call_event, SWITCH_EVENT_CHANNEL_DATA) != SWITCH_STATUS_SUCCESS) {
+		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Memory Error!\n");
+		return SWITCH_STATUS_MEMERR;
+	}
+	switch_caller_profile_event_set_data(switch_channel_get_caller_profile(channel), "Channel", call_event);
+	switch_channel_event_set_data(channel, call_event);
+	switch_event_add_header_string(call_event, SWITCH_STACK_BOTTOM, "Content-Type", "command/reply");
+	switch_event_add_header_string(call_event, SWITCH_STACK_BOTTOM, "Reply-Text", "+OK\n");
 	
-			if (ei_decode_atom(buf->buff, &buf->index, api_cmd)) {
-				ei_x_encode_tuple_header(rbuf, 2);
-				ei_x_encode_atom(rbuf, "error");
-				ei_x_encode_atom(rbuf, "badarg");
-				break;
-			}
-
-			if (ei_decode_string(buf->buff, &buf->index, arg)) {
-				ei_x_encode_tuple_header(rbuf, 2);
-				ei_x_encode_atom(rbuf, "error");
-				ei_x_encode_atom(rbuf, "badarg");
-				break;
-			}
-			struct api_command_struct acs = { 0 };
-			acs.listener = listener;
-			acs.api_cmd = api_cmd;
-			acs.arg = arg;
-			acs.bg = 0;
-			acs.pid = msg->from;
-			api_exec(NULL, (void *) &acs);
-			goto noreply;
-
-		} else if (!strncmp(tupletag, "bgapi", MAXATOMLEN)) {
-			if (arity < 3) {
-				ei_x_encode_tuple_header(rbuf, 2);
-				ei_x_encode_atom(rbuf, "error");
-				ei_x_encode_atom(rbuf, "badarg");
-				break;
-			}
-
-			char api_cmd[MAXATOMLEN];
-			char arg[1024];
-
-			if (ei_decode_atom(buf->buff, &buf->index, api_cmd)) {
-				ei_x_encode_tuple_header(rbuf, 2);
-				ei_x_encode_atom(rbuf, "error");
-				ei_x_encode_atom(rbuf, "badarg");
-				break;
-			}
-
-			if (ei_decode_string(buf->buff, &buf->index, arg)) {
-				ei_x_encode_tuple_header(rbuf, 2);
-				ei_x_encode_atom(rbuf, "error");
-				ei_x_encode_atom(rbuf, "badarg");
-				break;
-			}
-
-			struct api_command_struct *acs = NULL;
-			switch_memory_pool_t *pool;
-			switch_thread_t *thread;
-			switch_threadattr_t *thd_attr = NULL;
-			switch_uuid_t uuid;
-
-			switch_core_new_memory_pool(&pool);
-			acs = switch_core_alloc(pool, sizeof(*acs));
-			switch_assert(acs);
-			acs->pool = pool;
-			acs->listener = listener;
-			acs->api_cmd = switch_core_strdup(acs->pool, api_cmd);
-			acs->arg = switch_core_strdup(acs->pool, arg);
-			acs->bg = 1;
-			acs->pid = msg->from;
-
-			switch_threadattr_create(&thd_attr, acs->pool);
-			switch_threadattr_detach_set(thd_attr, 1);
-			switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
-
-			switch_uuid_get(&uuid);
-			switch_uuid_format(acs->uuid_str, &uuid);
-			switch_thread_create(&thread, thd_attr, api_exec, acs, acs->pool);
-
-			ei_x_encode_tuple_header(rbuf, 2);
-			ei_x_encode_atom(rbuf, "ok");
-			ei_x_encode_string(rbuf, acs->uuid_str);
-
-			break;
-		} else if (!strncmp(tupletag, "sendevent", MAXATOMLEN)) {
-			char ename[MAXATOMLEN];
-
-			if (ei_decode_atom(buf->buff, &buf->index, ename)) {
-				ei_x_encode_tuple_header(rbuf, 2);
-				ei_x_encode_atom(rbuf, "error");
-				ei_x_encode_atom(rbuf, "badarg");
-				break;
-			}
-
-			int headerlength;
-
-			if (ei_decode_list_header(buf->buff, &buf->index, &headerlength)) {
-				ei_x_encode_tuple_header(rbuf, 2);
-				ei_x_encode_atom(rbuf, "error");
-				ei_x_encode_atom(rbuf, "badarg");
-				break;
-			}
-
-			switch_event_types_t etype;
-			if (switch_name_event(ename, &etype) == SWITCH_STATUS_SUCCESS) {
-				switch_event_t *event;
-
-				if (switch_event_create(&event, etype) == SWITCH_STATUS_SUCCESS) {
-
-					char key[1024];
-					char value[1024];
-					int i = 0;
-					while(!ei_decode_tuple_header(buf->buff, &buf->index, &arity) && arity == 2) {
-						i++;
-						if (ei_decode_string(buf->buff, &buf->index, key))
-							goto sendevent_fail;
-						if (ei_decode_string(buf->buff, &buf->index, value))
-							goto sendevent_fail;
-
-						switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, key, value);
-					}
-
-					if (headerlength != i)
-						goto sendevent_fail;
-					
-
-					switch_event_fire(&event);
-					ei_x_encode_atom(rbuf, "ok");
-					break;
+	ei_x_buff lbuf;
+	ei_x_new_with_version(&lbuf);
+	ei_x_encode_tuple_header(&lbuf, 2);
+	ei_x_encode_atom(&lbuf, "call");
+	ei_encode_switch_event(&lbuf, call_event);
+	switch_mutex_lock(listener->sock_mutex);
+	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Sending initial call event\n");
+	if (ei_reg_send(listener->ec,listener->sockfd, reg_name, lbuf.buff, lbuf.index)==ERL_ERROR) {
+		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to send call event\n");
+	}
+	switch_mutex_unlock(listener->sock_mutex);
+	
+	ei_x_free(&lbuf);
+	return SWITCH_STATUS_SUCCESS;
+}
 
-sendevent_fail:
-					ei_x_encode_tuple_header(rbuf, 2);
-					ei_x_encode_atom(rbuf, "error");
-					ei_x_encode_atom(rbuf, "badarg");
-					break;
-				}
-			}
-		} else if (!strncmp(tupletag, "sendmsg", MAXATOMLEN)) {
-			char uuid[37];
+static switch_status_t check_attached_sessions(listener_t *listener)
+{
+	session_elem_t *last,*sp;
+	switch_status_t status = SWITCH_STATUS_SUCCESS;
+	void *pop;
+	/* check up on all the attached sessions -
+	   if they have not yet sent an initial call event to the associated erlang process then do so
+	   if they have pending events in their queues then send them
+	   if the session has finished then clean it up
+	*/
+	switch_mutex_lock(listener->session_mutex);
+	sp = listener->session_list;
+	last = NULL;
+	while(sp) {
+		if (!switch_test_flag(sp, LFLAG_OUTBOUND_INIT)) {
+			status = notify_new_session(listener, sp->session, sp->reg_name);
+			if (status != SWITCH_STATUS_SUCCESS)
+				break;
+			switch_set_flag(sp, LFLAG_OUTBOUND_INIT);
+		}
+		/* check event queue for this session */
+		if (switch_queue_trypop(sp->event_queue, &pop) == SWITCH_STATUS_SUCCESS) {			   
+			switch_event_t *pevent = (switch_event_t *) pop;
 			
-			if (ei_decode_string(buf->buff, &buf->index, uuid)) {
-				ei_x_encode_tuple_header(rbuf, 2);
-				ei_x_encode_atom(rbuf, "error");
-				ei_x_encode_atom(rbuf, "badarg");
-				break;
-			}
-
-			switch_core_session_t *session;
-			if (!switch_strlen_zero(uuid) && (session = switch_core_session_locate(uuid))) {
-			} else {
-				ei_x_encode_tuple_header(rbuf, 2);
-				ei_x_encode_atom(rbuf, "error");
-				ei_x_encode_atom(rbuf, "nosession");
-				break;
-			}
-
-			int headerlength;
-
-			if (ei_decode_list_header(buf->buff, &buf->index, &headerlength)) {
-				ei_x_encode_tuple_header(rbuf, 2);
-				ei_x_encode_atom(rbuf, "error");
-				ei_x_encode_atom(rbuf, "badarg");
-				break;
-			}
-
-			switch_event_t *event;
-
-			if (switch_event_create(&event, SWITCH_EVENT_SEND_MESSAGE) == SWITCH_STATUS_SUCCESS) {
-
-				char key[1024];
-				char value[1024];
-				int i = 0;
-				while(!ei_decode_tuple_header(buf->buff, &buf->index, &arity) && arity == 2) {
-					i++;
-					if (ei_decode_string(buf->buff, &buf->index, key))
-						goto sendmsg_fail;
-					if (ei_decode_string(buf->buff, &buf->index, value))
-						goto sendmsg_fail;
-
-					switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, key, value);
-				}
-
-				if (headerlength != i)
-					goto sendmsg_fail;
-
-				if (switch_core_session_queue_private_event(session, &event) == SWITCH_STATUS_SUCCESS) {
-					ei_x_encode_atom(rbuf, "ok");
-				} else {
-					ei_x_encode_tuple_header(rbuf, 2);
-					ei_x_encode_atom(rbuf, "error");
-					ei_x_encode_atom(rbuf, "badmem");
-				}
-
-				/* release the lock returned by switch_core_locate_session */
-				switch_core_session_rwunlock(session);
-				break;
-
-sendmsg_fail:
-				ei_x_encode_tuple_header(rbuf, 2);
-				ei_x_encode_atom(rbuf, "error");
-				ei_x_encode_atom(rbuf, "badarg");
-				break;
-			}
-
-		} else if (!strncmp(tupletag, "bind", MAXATOMLEN)) {
-
-			/* format is (result|config|directory|dialplan|phrases)  */
-			char sectionstr[MAXATOMLEN];
-
-			if (ei_decode_atom(buf->buff, &buf->index, sectionstr)) {
-				ei_x_encode_tuple_header(rbuf, 2);
-				ei_x_encode_atom(rbuf, "error");
-				ei_x_encode_atom(rbuf, "badarg");
-				break;
-			}
-
-			switch_xml_section_t section;
-
-			if (!(section = switch_xml_parse_section_string(sectionstr))) {
-				ei_x_encode_tuple_header(rbuf, 2);
-				ei_x_encode_atom(rbuf, "error");
-				ei_x_encode_atom(rbuf, "badarg");
-				break;
-			}
-
-			struct erlang_binding *binding, *ptr;
-
-			if (!(binding = switch_core_alloc(listener->pool, sizeof(*binding)))) {
-				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error\n");
-				ei_x_encode_tuple_header(rbuf, 2);
-				ei_x_encode_atom(rbuf, "error");
-				ei_x_encode_atom(rbuf, "badmem");
-				break;
-			}
-
-			binding->section = section;
-			binding->pid = msg->from;
-			binding->listener = listener;
-
-			switch_core_hash_init(&listener->fetch_reply_hash, listener->pool);
-
-			switch_mutex_lock(globals.listener_mutex);
-
-			for (ptr = bindings.head; ptr && ptr->next; ptr = ptr->next);
-
-			if (ptr) {
-				ptr->next = binding;
-			} else {
-				bindings.head = binding;
-			}
+			/* events from attached sessions are wrapped in a {call_event,<EVT>} tuple 
+			   to distinguish them from normal events (if they are sent to the same process)
+			 */
+			ei_x_buff ebuf;
+			ei_x_new_with_version(&ebuf);		 
+			ei_x_encode_tuple_header(&ebuf, 2);
+			ei_x_encode_atom(&ebuf, "call_event");
+			ei_encode_switch_event(&ebuf, pevent);
 			
-			switch_xml_set_binding_sections(bindings.search_binding, switch_xml_get_binding_sections(bindings.search_binding) | section);
-			switch_mutex_unlock(globals.listener_mutex);
-
-			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "sections %d\n", switch_xml_get_binding_sections(bindings.search_binding));
-
-			ei_link(listener, ei_self(listener->ec), &msg->from);
-
-		} else {
-			ei_x_encode_tuple_header(rbuf, 2);
-			ei_x_encode_atom(rbuf, "error");
-			ei_x_encode_atom(rbuf, "undef");
-			break;
-		}
-
-		ei_x_encode_atom(rbuf, "ok");
-		break;
-	case ERL_ATOM_EXT :
-		if (ei_decode_atom(buf->buff, &buf->index, atom)) {
-			ei_x_encode_tuple_header(rbuf, 2);
-			ei_x_encode_atom(rbuf, "error");
-			ei_x_encode_atom(rbuf, "badarg");
-			break;
-		}
+			switch_mutex_lock(listener->sock_mutex);
+			ei_reg_send(listener->ec, listener->sockfd, sp->reg_name, ebuf.buff, ebuf.index);
+			switch_mutex_unlock(listener->sock_mutex);
+
+			/* event is a hangup, so this session can be removed */
+			if (pevent->event_id == SWITCH_EVENT_CHANNEL_HANGUP) {
+				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Hangup event for attached session\n");
+
+				/* remove session from list */
+				if (last)
+					last->next = sp->next;
+				else
+					listener->session_list = sp->next;
+					
+				/* this allows the application threads to exit */
+				switch_clear_flag_locked(sp, LFLAG_SESSION_ALIVE);				
 
-		if (!strncmp(atom, "nolog", MAXATOMLEN)) {
-			if (switch_test_flag(listener, LFLAG_LOG)) {
-				switch_clear_flag_locked(listener, LFLAG_LOG);
-			}
-		} else if (!strncmp(atom, "register_log_handler", MAXATOMLEN)) {
-			ei_link(listener, ei_self(listener->ec), &msg->from);
-			listener->log_pid = msg->from;
-			listener->level = SWITCH_LOG_DEBUG;
-			switch_set_flag(listener, LFLAG_LOG);
-		} else if (!strncmp(atom, "register_event_handler", MAXATOMLEN)) {
-			ei_link(listener, ei_self(listener->ec), &msg->from);
-			listener->event_pid = msg->from;
-			if (!switch_test_flag(listener, LFLAG_EVENTS)) {
-				switch_set_flag_locked(listener, LFLAG_EVENTS);
+				/* TODO
+				   if this listener was created outbound, and the last session has been detached
+				   should the listener also exit? Does it matter?
+				 */
 			}
-		} else if (!strncmp(atom, "noevents", MAXATOMLEN)) {
-			void *pop;
-			/*purge the event queue */
-			while (switch_queue_trypop(listener->event_queue, &pop) == SWITCH_STATUS_SUCCESS);
-
-			if (switch_test_flag(listener, LFLAG_EVENTS)) {
-				uint8_t x = 0;
-				switch_clear_flag_locked(listener, LFLAG_EVENTS);
-				for (x = 0; x <= SWITCH_EVENT_ALL; x++) {
-					listener->event_list[x] = 0;
-				}
-				/* wipe the hash */
-				switch_core_hash_destroy(&listener->event_hash);
-				switch_core_hash_init(&listener->event_hash, listener->pool);
-			} else {
-				ei_x_encode_tuple_header(rbuf, 2);
-				ei_x_encode_atom(rbuf, "error");
-				ei_x_encode_atom(rbuf, "notlistening");
-				break;
-			}
-		} else if (!strncmp(atom, "exit", MAXATOMLEN)) {
-			switch_clear_flag_locked(listener, LFLAG_RUNNING);
-			ei_x_encode_atom(rbuf, "ok");
-			goto event_done;
-		} else if (!strncmp(atom, "getpid", MAXATOMLEN)) {
-			ei_x_encode_tuple_header(rbuf, 2);
-			ei_x_encode_atom(rbuf, "ok");
-			ei_x_encode_pid(rbuf, ei_self(listener->ec));
-		} else if (!strncmp(atom, "link", MAXATOMLEN)) {
-			/* debugging */
-			ei_link(listener, ei_self(listener->ec), &msg->from);
-			goto noreply;
-		} else {
-			ei_x_encode_tuple_header(rbuf, 2);
-			ei_x_encode_atom(rbuf, "error");
-			ei_x_encode_atom(rbuf, "undef");
-			break;
+			
+			ei_x_free(&ebuf);
+			switch_event_destroy(&pevent);
 		}
-
-		ei_x_encode_atom(rbuf, "ok");
-		break;
-	default :
-		/* some other kind of erlang term */
-		ei_x_encode_tuple_header(rbuf, 2);
-		ei_x_encode_atom(rbuf, "error");
-		ei_x_encode_atom(rbuf, "undef");
-		break;
+		last = sp;
+		sp = sp->next;
 	}
-
-	
-	switch_mutex_lock(listener->sock_mutex);
-	ei_send(listener->sockfd, &msg->from, rbuf->buff, rbuf->index);
-	switch_mutex_unlock(listener->sock_mutex);
-noreply:
-	return 0;
-
-event_done:
-	switch_mutex_lock(listener->sock_mutex);
-	ei_send(listener->sockfd, &msg->from, rbuf->buff, rbuf->index);
-	switch_mutex_unlock(listener->sock_mutex);
-	return 1;
+	switch_mutex_unlock(listener->session_mutex);
+	return status;
 }
 
-
-static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj)
+static void check_log_queue(listener_t *listener)
 {
-	listener_t *listener = (listener_t *) obj;
-	switch_core_session_t *session = NULL;
-	switch_channel_t *channel = NULL;
-	int status = 1;
 	void *pop;
 
-	switch_mutex_lock(globals.listener_mutex);
-	prefs.threads++;
-	switch_mutex_unlock(globals.listener_mutex);
-
-	switch_assert(listener != NULL);
-	
-	if (prefs.acl_count && !switch_strlen_zero(listener->remote_ip)) {
-		uint32_t x = 0;
-		for (x = 0; x < prefs.acl_count; x++) {
-			if (!switch_check_network_list_ip(listener->remote_ip, prefs.acl[x])) {
-				erlang_msg msg;
-
-				ei_x_buff buf;
-				ei_x_new(&buf);
-
-				status = ei_xreceive_msg(listener->sockfd, &msg, &buf);
-				/* get data off the socket, just so we can get the pid on the other end */
-				if (status == ERL_MSG) {
-					/* if we got a message, return an ACL error. */
-					ei_x_buff rbuf;
-					ei_x_new_with_version(&rbuf);
-
-					ei_x_encode_tuple_header(&rbuf, 2);
-					ei_x_encode_atom(&rbuf, "error");
-					ei_x_encode_atom(&rbuf, "acldeny");
-
-					ei_send(listener->sockfd, &msg.from, rbuf.buff, rbuf.index);
-
-					ei_x_free(&rbuf);
-				}
-
-				ei_x_free(&buf);
-
-				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connection from %s denied by acl %s\n", listener->remote_ip, prefs.acl[x]);
-				goto done;
+	/* send out any pending crap in the log queue */
+	if (switch_test_flag(listener, LFLAG_LOG)) {
+		if (switch_queue_trypop(listener->log_queue, &pop) == SWITCH_STATUS_SUCCESS) {
+			switch_log_node_t *dnode = (switch_log_node_t *) pop;
+			
+			if (dnode->data) {
+				ei_x_buff lbuf;
+				ei_x_new_with_version(&lbuf);
+				ei_x_encode_tuple_header(&lbuf, 2);
+				ei_x_encode_atom(&lbuf, "log");
+				ei_x_encode_list_header(&lbuf, 6);
+				
+				ei_x_encode_tuple_header(&lbuf, 2);
+				ei_x_encode_atom(&lbuf, "level");
+				ei_x_encode_char(&lbuf, (unsigned char)dnode->level);
+				
+				ei_x_encode_tuple_header(&lbuf, 2);
+				ei_x_encode_atom(&lbuf, "text_channel");
+				ei_x_encode_char(&lbuf, (unsigned char)dnode->level);
+				
+				ei_x_encode_tuple_header(&lbuf, 2);
+				ei_x_encode_atom(&lbuf, "file");
+				ei_x_encode_string(&lbuf, dnode->file);
+				
+				ei_x_encode_tuple_header(&lbuf, 2);
+				ei_x_encode_atom(&lbuf, "func");
+				ei_x_encode_string(&lbuf, dnode->func);
+				
+				ei_x_encode_tuple_header(&lbuf, 2);
+				ei_x_encode_atom(&lbuf, "line");
+				ei_x_encode_ulong(&lbuf, (unsigned long)dnode->line);
+				
+				ei_x_encode_tuple_header(&lbuf, 2);
+				ei_x_encode_atom(&lbuf, "data");
+				ei_x_encode_string(&lbuf, dnode->data);
+				
+				ei_x_encode_empty_list(&lbuf);
+				
+				switch_mutex_lock(listener->sock_mutex);
+				ei_send(listener->sockfd, &listener->log_pid, lbuf.buff, lbuf.index);
+				switch_mutex_unlock(listener->sock_mutex);
+				
+				ei_x_free(&lbuf);
+				free(dnode->data);
+				free(dnode);
 			}
 		}
 	}
+}
 
-	if ((session = listener->session)) {
-		channel = switch_core_session_get_channel(session);
-		if (switch_core_session_read_lock(session) != SWITCH_STATUS_SUCCESS) {
-			goto done;
+static void check_event_queue(listener_t *listener)
+{
+	void* pop;
+	/* send out any pending crap in the event queue */
+	if (switch_test_flag(listener, LFLAG_EVENTS)) {
+		if (switch_queue_trypop(listener->event_queue, &pop) == SWITCH_STATUS_SUCCESS) {
+			
+			switch_event_t *pevent = (switch_event_t *) pop;
+			
+			ei_x_buff ebuf;
+			ei_x_new_with_version(&ebuf);
+			
+			ei_encode_switch_event(&ebuf, pevent);
+			
+			switch_mutex_lock(listener->sock_mutex);
+			ei_send(listener->sockfd, &listener->event_pid, ebuf.buff, ebuf.index);
+			switch_mutex_unlock(listener->sock_mutex);
+			
+			ei_x_free(&ebuf);
+			switch_event_destroy(&pevent);
 		}
 	}
+}
 
-	if (switch_strlen_zero(listener->remote_ip)) {
-		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connection Open\n");
-	} else {
-		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connection Open from %s\n", listener->remote_ip);/*, listener->remote_port);*/
-	}
-
-	switch_set_flag_locked(listener, LFLAG_RUNNING);
-	add_listener(listener);
+static void listener_main_loop(listener_t *listener) 
+{
+	int status = 1;
 
 	while ((status >= 0 || erl_errno == ETIMEDOUT || erl_errno == EAGAIN) && !prefs.done) {
 		erlang_msg msg;
@@ -1237,13 +595,13 @@
 					case ERL_SEND :
 						/*switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "erl_send\n");*/
 						if (handle_msg(listener, &msg, &buf, &rbuf)) {
-							goto done;
+							return;
 						}
 						break;
 					case ERL_REG_SEND :
 						/*switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "erl_reg_send\n");*/
 						if (handle_msg(listener, &msg, &buf, &rbuf)) {
-							goto done;
+						    return;
 						}
 						break;
 					case ERL_LINK :
@@ -1275,77 +633,76 @@
 		ei_x_free(&buf);
 		ei_x_free(&rbuf);
 
-		/* send out any pending crap in the log queue */
-		if (switch_test_flag(listener, LFLAG_LOG)) {
-			if (switch_queue_trypop(listener->log_queue, &pop) == SWITCH_STATUS_SUCCESS) {
-				switch_log_node_t *dnode = (switch_log_node_t *) pop;
-
-				if (dnode->data) {
-					ei_x_buff lbuf;
-					ei_x_new_with_version(&lbuf);
-					ei_x_encode_tuple_header(&lbuf, 2);
-					ei_x_encode_atom(&lbuf, "log");
-					ei_x_encode_list_header(&lbuf, 6);
-
-					ei_x_encode_tuple_header(&lbuf, 2);
-					ei_x_encode_atom(&lbuf, "level");
-					ei_x_encode_char(&lbuf, (unsigned char)dnode->level);
-
-					ei_x_encode_tuple_header(&lbuf, 2);
-					ei_x_encode_atom(&lbuf, "text_channel");
-					ei_x_encode_char(&lbuf, (unsigned char)dnode->level);
-
-					ei_x_encode_tuple_header(&lbuf, 2);
-					ei_x_encode_atom(&lbuf, "file");
-					ei_x_encode_string(&lbuf, dnode->file);
-	
-					ei_x_encode_tuple_header(&lbuf, 2);
-					ei_x_encode_atom(&lbuf, "func");
-					ei_x_encode_string(&lbuf, dnode->func);
-
-					ei_x_encode_tuple_header(&lbuf, 2);
-					ei_x_encode_atom(&lbuf, "line");
-					ei_x_encode_ulong(&lbuf, (unsigned long)dnode->line);
-
-					ei_x_encode_tuple_header(&lbuf, 2);
-					ei_x_encode_atom(&lbuf, "data");
-					ei_x_encode_string(&lbuf, dnode->data);
-
-					ei_x_encode_empty_list(&lbuf);
-
-					switch_mutex_lock(listener->sock_mutex);
-					ei_send(listener->sockfd, &listener->log_pid, lbuf.buff, lbuf.index);
-					switch_mutex_unlock(listener->sock_mutex);
-
-					ei_x_free(&lbuf);
-					free(dnode->data);
-					free(dnode);
-				}
-			}
+		check_log_queue(listener);
+		check_event_queue(listener);
+		if (SWITCH_STATUS_SUCCESS != check_attached_sessions(listener)) {
+			return;
 		}
+	}
+}
 
-		/* ditto with the event queue */
-		if (switch_test_flag(listener, LFLAG_EVENTS)) {
-			if (switch_queue_trypop(listener->event_queue, &pop) == SWITCH_STATUS_SUCCESS) {
+static switch_bool_t check_inbound_acl(listener_t* listener)
+{
+	/* check acl to see if inbound connection is allowed */
+	if (prefs.acl_count && !switch_strlen_zero(listener->remote_ip)) {
+		uint32_t x = 0;
+		for (x = 0; x < prefs.acl_count; x++) {
+			if (!switch_check_network_list_ip(listener->remote_ip, prefs.acl[x])) {
+				int status = 1;
+				erlang_msg msg;
 
-				switch_event_t *pevent = (switch_event_t *) pop;
+				ei_x_buff buf;
+				ei_x_new(&buf);
 
-				ei_x_buff ebuf;
-				ei_x_new_with_version(&ebuf);
+				status = ei_xreceive_msg(listener->sockfd, &msg, &buf);
+				/* get data off the socket, just so we can get the pid on the other end */
+				if (status == ERL_MSG) {
+					/* if we got a message, return an ACL error. */
+					ei_x_buff rbuf;
+					ei_x_new_with_version(&rbuf);
 
-				ei_encode_switch_event(&ebuf, pevent);
+					ei_x_encode_tuple_header(&rbuf, 2);
+					ei_x_encode_atom(&rbuf, "error");
+					ei_x_encode_atom(&rbuf, "acldeny");
 
-				switch_mutex_lock(listener->sock_mutex);
-				ei_send(listener->sockfd, &listener->event_pid, ebuf.buff, ebuf.index);
-				switch_mutex_unlock(listener->sock_mutex);
+					ei_send(listener->sockfd, &msg.from, rbuf.buff, rbuf.index);
+
+					ei_x_free(&rbuf);
+				}
+
+				ei_x_free(&buf);
 
-				ei_x_free(&ebuf);
-				switch_event_destroy(&pevent);
+				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connection from %s denied by acl %s\n", listener->remote_ip, prefs.acl[x]);
+				return SWITCH_FALSE;
 			}
 		}
 	}
+	return SWITCH_TRUE;
+}
 
-done:
+static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj)
+{
+	listener_t *listener = (listener_t *) obj;
+	session_elem_t* s;
+
+	switch_mutex_lock(globals.listener_mutex);
+	prefs.threads++;
+	switch_mutex_unlock(globals.listener_mutex);
+
+	switch_assert(listener != NULL);
+
+	if (check_inbound_acl(listener)) {
+		if (switch_strlen_zero(listener->remote_ip)) {
+			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connection Open\n");
+		} else {
+			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connection Open from %s\n", listener->remote_ip);/*, listener->remote_port);*/
+		}
+		
+		add_listener(listener);
+  		listener_main_loop(listener);
+	}
+
+	/* clean up */
 	remove_listener(listener);
 
 	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Session complete, waiting for children\n");
@@ -1364,16 +721,22 @@
 	/* remove any bindings for this connection */
 	remove_binding(listener, NULL);
 
-	if (listener->session) {
-		switch_channel_clear_flag(switch_core_session_get_channel(listener->session), CF_CONTROLLED);
-		switch_clear_flag_locked(listener, LFLAG_SESSION);
-		switch_core_session_rwunlock(listener->session);
-	} else if (listener->pool) {
+	/* clean up all the attached sessions */
+	switch_mutex_lock(listener->session_mutex);
+	for (s = listener->session_list; s; s = s->next) {
+		switch_channel_clear_flag(switch_core_session_get_channel(s->session), CF_CONTROLLED);
+		/* this allows the application threads to exit */
+		switch_clear_flag_locked(s, LFLAG_SESSION_ALIVE);
+		/* */
+		switch_core_session_rwunlock(s->session);
+	}
+	switch_mutex_unlock(listener->session_mutex);
+
+	if (listener->pool) {
 		switch_memory_pool_t *pool = listener->pool;
 		switch_core_destroy_memory_pool(&pool);
 	}
 
-
 	switch_mutex_lock(globals.listener_mutex);
 	prefs.threads--;
 	switch_mutex_unlock(globals.listener_mutex);
@@ -1454,12 +817,148 @@
 	return 0;
 }
 
+static listener_t* new_listener(struct ei_cnode_s *ec, int clientfd)
+{
+	switch_memory_pool_t *listener_pool = NULL;
+	listener_t* listener = NULL;
+
+	if (switch_core_new_memory_pool(&listener_pool) != SWITCH_STATUS_SUCCESS) {
+		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "OH OH no pool\n");
+		return NULL;
+	}
+
+	if (!(listener = switch_core_alloc(listener_pool, sizeof(*listener)))) {
+		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error\n");
+		return NULL;
+	}
+
+	switch_thread_rwlock_create(&listener->rwlock, listener_pool);
+	switch_queue_create(&listener->event_queue, SWITCH_CORE_QUEUE_LEN, listener_pool);
+	switch_queue_create(&listener->log_queue, SWITCH_CORE_QUEUE_LEN, listener_pool);
+		
+	listener->ec = ec;
+	listener->sockfd = clientfd;
+	listener->pool = listener_pool;
+	listener_pool = NULL;
+	listener->level = SWITCH_LOG_DEBUG;
+	switch_mutex_init(&listener->flag_mutex, SWITCH_MUTEX_NESTED, listener->pool);
+	switch_mutex_init(&listener->sock_mutex, SWITCH_MUTEX_NESTED, listener->pool);
+	switch_mutex_init(&listener->session_mutex, SWITCH_MUTEX_NESTED, listener->pool);
+	switch_core_hash_init(&listener->event_hash, listener->pool);
+
+	return listener;
+}
+
+static listener_t* new_outbound_listener(char* node)
+{
+	listener_t* listener = NULL;
+	struct ei_cnode_s ec;
+	int clientfd;
+
+	if (SWITCH_STATUS_SUCCESS==initialise_ei(&ec)) {
+		errno = 0;
+		if ((clientfd=ei_connect(&ec,node)) == ERL_ERROR) {
+			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error connecting to node %s (erl_errno=%d, errno=%d)!\n",node,erl_errno,errno);
+			return NULL;
+		}
+		listener = new_listener(&ec,clientfd);
+		listener->peer_nodename = switch_core_strdup(listener->pool,node);
+	}
+	return listener;
+}
+
+session_elem_t* attach_call_to_listener(listener_t* listener, char* reg_name, switch_core_session_t *session)
+{
+	/* create a session list element */
+	session_elem_t* session_element=NULL;
+	if (!(session_element = switch_core_alloc(switch_core_session_get_pool(session), sizeof(*session_element)))) {
+		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to allocate session element\n");
+	}
+	else {
+		if (SWITCH_STATUS_SUCCESS != switch_core_session_read_lock(session)) {
+			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to get session read lock\n");
+		}
+		else {
+			session_element->session = session;
+			session_element->reg_name = switch_core_strdup(switch_core_session_get_pool(session),reg_name);
+			switch_set_flag(session_element, LFLAG_SESSION_ALIVE);
+			switch_clear_flag(session_element, LFLAG_OUTBOUND_INIT);
+			switch_queue_create(&session_element->event_queue, SWITCH_CORE_QUEUE_LEN, switch_core_session_get_pool(session));
+			switch_mutex_init(&session_element->flag_mutex, SWITCH_MUTEX_NESTED, switch_core_session_get_pool(session));
+			/* attach the session to the listener */
+			add_session_elem_to_listener(listener,session_element);
+		}
+	}
+	return session_element;
+}
 
 /* Module Hooks */
 
+/* Entry point for outbound mode */
+SWITCH_STANDARD_APP(erlang_outbound_function)
+{
+	char *reg_name, *node;
+	listener_t *listener;
+	int argc = 0;
+	char *argv[80] = { 0 };
+	char *mydata;
+	switch_bool_t new_session = SWITCH_FALSE;
+	session_elem_t* session_element=NULL;
+
+	/* process app arguments */
+	if (data && (mydata = switch_core_session_strdup(session, data))) {
+		argc = switch_separate_string(mydata, ' ', argv, (sizeof(argv) / sizeof(argv[0])));
+	}
+	if (argc < 2) {
+		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Parse Error - need registered name and node!\n");
+		return;
+	}
+	reg_name = argv[0];
+	if (switch_strlen_zero(reg_name)) {
+		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Missing registered name!\n");
+		return;
+	}
+	node = argv[1];
+	if (switch_strlen_zero(node)) {
+		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Missing node name!\n");
+		return;
+	}
+
+	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "enter erlang_outbound_function %s %s\n",reg_name, node);
+
+	/* first work out if there is a listener already talking to the node we want to talk to */
+	listener = find_listener(node);
+	/* if there is no listener, then create one */
+	if (!listener) {
+		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Creating new listener for session\n");
+		new_session = SWITCH_TRUE;
+		listener = new_outbound_listener(node);
+	}
+	else {
+		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Using existing listener for session\n");
+	}
+	if (listener &&
+		(session_element=attach_call_to_listener(listener,reg_name,session)) != NULL) {
+		
+		if (new_session)
+			launch_listener_thread(listener);
+		switch_ivr_park(session, NULL);
+
+		/* keep app thread running for lifetime of session */
+		if (switch_channel_get_state(switch_core_session_get_channel(session)) >= CS_HANGUP) {
+			while (switch_test_flag(session_element, LFLAG_SESSION_ALIVE)) {
+				switch_yield(100000);
+			}
+		}
+	}
+	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "exit erlang_outbound_function\n");
+}
+
 
 SWITCH_MODULE_LOAD_FUNCTION(mod_erlang_event_load)
 {
+	switch_application_interface_t *app_interface;
+
 	switch_mutex_init(&globals.listener_mutex, SWITCH_MUTEX_NESTED, pool);
 
 	if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, NULL, &globals.node) != SWITCH_STATUS_SUCCESS) {
@@ -1483,6 +982,8 @@
 	/* connect my internal structure to the blank pointer passed to me */
 	*module_interface = switch_loadable_module_create_module_interface(pool, modname);
 
+	SWITCH_ADD_APP(app_interface, "erlang", "Connect to an erlang node", "Connect to erlang", erlang_outbound_function, "<registered name> <node at host>", SAF_SUPPORT_NOMEDIA);
+
 	/* indicate that the module should continue to be loaded */
 	return SWITCH_STATUS_SUCCESS;
 }
@@ -1557,25 +1058,7 @@
 		switch_yield(100000);
 	}
 
-	struct hostent *nodehost = gethostbyaddr(&server_addr.sin_addr.s_addr, sizeof(server_addr.sin_addr.s_addr), AF_INET);
-
-	char *thishostname = nodehost->h_name;
-	char thisnodename[MAXNODELEN+1];
-
-	if (!strcmp(thishostname, "localhost"))
-		gethostname(thishostname, EI_MAXHOSTNAMELEN);
-
-	if (prefs.shortname) {
-		char *off;
-		if ((off = strchr(thishostname, '.'))) {
-			*off = '\0';
-		}
-	}
-
-	snprintf(thisnodename, MAXNODELEN+1, "%s@%s", prefs.nodename, thishostname);
-
-	/* init the ei stuff */
-	if (ei_connect_xinit(&ec, thishostname, prefs.nodename, thisnodename, (Erl_IpAddr)(&server_addr.sin_addr.s_addr), prefs.cookie, 0) < 0) {
+	if (SWITCH_STATUS_SUCCESS!=initialise_ei(&ec)) {
 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to init ei connection\n");
 		close_socket(&listen_list.sockfd);
 		return SWITCH_STATUS_GENERR;
@@ -1596,7 +1079,7 @@
 		}
 	}
 
-	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connected and published erlang cnode at %s port %u\n", thisnodename, prefs.port);
+	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connected and published erlang cnode\n");
 
 	listen_list.ready = 1;
 
@@ -1621,59 +1104,42 @@
 			break;
 		}
 
-		if (switch_core_new_memory_pool(&listener_pool) != SWITCH_STATUS_SUCCESS) {
-			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "OH OH no pool\n");
-			goto fail;
-		}
+		listener = new_listener(&ec,clientfd);
+		if (listener) {
+			/* store the IP and node name we are talking with */
+			inet_ntop(AF_INET, conn.ipadr, listener->remote_ip, sizeof(listener->remote_ip));
+			listener->peer_nodename = switch_core_strdup(listener->pool,conn.nodename);
 
-		if (!(listener = switch_core_alloc(listener_pool, sizeof(*listener)))) {
-			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error\n");
-			break;
+			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Launching listener, connection from node %s, ip %s\n", conn.nodename, listener->remote_ip);
+			launch_listener_thread(listener);
 		}
-
-		switch_thread_rwlock_create(&listener->rwlock, listener_pool);
-		switch_queue_create(&listener->event_queue, SWITCH_CORE_QUEUE_LEN, listener_pool);
-		switch_queue_create(&listener->log_queue, SWITCH_CORE_QUEUE_LEN, listener_pool);
-
-		inet_ntop(AF_INET, conn.ipadr, listener->remote_ip, sizeof(listener->remote_ip));
-
-		listener->ec = &ec;
-		listener->sockfd = clientfd;
-		listener->pool = listener_pool;
-		listener_pool = NULL;
-		listener->level = SWITCH_LOG_DEBUG;
-		switch_mutex_init(&listener->flag_mutex, SWITCH_MUTEX_NESTED, listener->pool);
-		switch_mutex_init(&listener->sock_mutex, SWITCH_MUTEX_NESTED, listener->pool);
-		switch_core_hash_init(&listener->event_hash, listener->pool);
-
-		launch_listener_thread(listener);
-
+		else
+			/* if we fail to create a listener (memory error), then the module will exit */
+			break;
 	}
 
+	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Exiting module mod_erlang_event\n");
+
 	/* cleanup epmd registration */
 	ei_unpublish(&ec);
 	close(epmdfd);
 
 	close_socket(&listen_list.sockfd);
-
 	if (pool) {
 		switch_core_destroy_memory_pool(&pool);
 	}
-
 	if (listener_pool) {
 		switch_core_destroy_memory_pool(&listener_pool);
 	}
-
-
 	for (x = 0; x < prefs.acl_count; x++) {
 		switch_safe_free(prefs.acl[x]);
 	}
 
-  fail:
 	prefs.done = 2;
 	return SWITCH_STATUS_TERM;
 }
 
+
 SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_erlang_event_shutdown)
 {
 	listener_t *l;

Added: freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h
==============================================================================
--- (empty file)
+++ freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h	Thu Jan 22 13:34:53 2009
@@ -0,0 +1,190 @@
+/* 
+ * FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
+ * Copyright (C) 2005/2006, Anthony Minessale II <anthmct at yahoo.com>
+ *
+ * Version: MPL 1.1
+ *
+ * The contents of this file are subject to the Mozilla Public License Version
+ * 1.1 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS" basis,
+ * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+ * for the specific language governing rights and limitations under the
+ * License.
+ *
+ * The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
+ *
+ * The Initial Developer of the Original Code is
+ * Anthony Minessale II <anthmct at yahoo.com>
+ * Portions created by the Initial Developer are Copyright (C)
+ * the Initial Developer. All Rights Reserved.
+ *
+ * Contributor(s):
+ * 
+ * Anthony Minessale II <anthmct at yahoo.com>
+ * Andrew Thompson <andrew at hijacked.us>
+ * Rob Charlton <rob.charlton at savageminds.com>
+ *
+ *
+ * mod_erlang_event.h -- Erlang Event Handler derived from mod_event_socket
+ *
+ */
+
+
+typedef enum {
+	LFLAG_OUTBOUND_INIT = (1 << 0), /* Erlang peer has been notified of this session */
+	LFLAG_SESSION_ALIVE
+} session_flag_t;
+
+struct session_elem {
+	switch_core_session_t *session;
+	switch_mutex_t *flag_mutex;
+	uint32_t flags;
+	/* registered process name that will receive call notifications from this session */
+	char* reg_name;
+	switch_queue_t *event_queue;
+	struct session_elem *next;
+};
+
+typedef struct session_elem session_elem_t;
+
+typedef enum {
+	LFLAG_RUNNING = (1 << 0),
+	LFLAG_EVENTS = (1 << 1),
+	LFLAG_LOG = (1 << 2),
+	LFLAG_MYEVENTS = (1 << 3),
+	LFLAG_STATEFUL = (1 << 4)
+} event_flag_t;
+
+/* There is one listener for each Erlang node we are attached to - either
+   inbound or outbound. For example, if the erlang node node1 at server connects
+   to freeswitch then a listener is created and handles commands sent from
+   that node. If 5 calls are directed to the outbound erlang application
+   via the dialplan, and are also set to talk to node1 at server, then those
+   5 call sessions will be "attached" to the same listener.
+ */
+struct listener {
+	int sockfd;
+	struct ei_cnode_s *ec;
+	erlang_pid log_pid;
+	erlang_pid event_pid;
+	char *peer_nodename;
+	switch_queue_t *event_queue;
+	switch_queue_t *log_queue;
+	switch_memory_pool_t *pool;
+	switch_mutex_t *flag_mutex;
+	switch_mutex_t *sock_mutex;
+	char *ebuf;
+	uint32_t flags;
+	switch_log_level_t level;
+	uint8_t event_list[SWITCH_EVENT_ALL + 1];
+	switch_hash_t *event_hash;
+	switch_hash_t *fetch_reply_hash;
+	switch_thread_rwlock_t *rwlock;
+	switch_mutex_t *session_mutex;
+	session_elem_t *session_list;
+	int lost_events;
+	int lost_logs;
+	time_t last_flush;
+	uint32_t timeout;
+	uint32_t id;
+	char remote_ip[50];
+	/*switch_port_t remote_port;*/
+	struct listener *next;
+};
+
+typedef struct listener listener_t;
+
+struct erlang_binding {
+	switch_xml_section_t section;
+	erlang_pid pid;
+	char *registered_process; /* TODO */
+	listener_t *listener;
+	struct erlang_binding *next;
+};
+
+struct api_command_struct {
+	char *api_cmd;
+	char *arg;
+	listener_t *listener;
+	char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1];
+	uint8_t bg;
+	erlang_pid pid;
+	switch_memory_pool_t *pool;
+};
+
+struct globals_struct {
+	switch_mutex_t *listener_mutex;
+	switch_event_node_t *node;
+};
+typedef struct globals_struct globals_t;
+
+struct listen_list_struct {
+	int sockfd;
+	switch_mutex_t *sock_mutex;
+	listener_t *listeners;
+	uint8_t ready;
+};
+typedef struct listen_list_struct listen_list_t;
+
+struct bindings_struct {
+	struct erlang_binding *head;
+	switch_xml_binding_t *search_binding;
+};
+typedef struct bindings_struct bindings_t;
+
+#define MAX_ACL 100
+struct prefs_struct {
+	switch_mutex_t *mutex;
+	char *ip;
+	char *nodename;
+	switch_bool_t shortname;
+	uint16_t port;
+	char *cookie;
+	int done;
+	int threads;
+	char *acl[MAX_ACL];
+	uint32_t acl_count;
+	uint32_t id;
+};
+typedef struct prefs_struct prefs_t;
+
+/* shared globals */
+#ifdef DEFINE_GLOBALS
+globals_t globals;
+listen_list_t listen_list;
+bindings_t bindings;
+prefs_t prefs;
+#else
+extern globals_t globals;
+extern listen_list_t listen_list;
+extern bindings_t bindings;
+extern prefs_t prefs;
+#endif
+
+/* function prototypes */
+/* handle_msg.c */
+int handle_msg(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff *rbuf);
+
+/* ei_helpers.c */
+void ei_link(listener_t *listener, erlang_pid *from, erlang_pid *to);
+void ei_encode_switch_event_headers(ei_x_buff *ebuf, switch_event_t *event);
+void ei_encode_switch_event_tag(ei_x_buff *ebuf, switch_event_t *event, char *tag);
+switch_status_t initialise_ei(struct ei_cnode_s *ec);
+#define ei_encode_switch_event(_b, _e) ei_encode_switch_event_tag(_b, _e, "event")
+
+/* mod_erlang_event.c */
+session_elem_t* attach_call_to_listener(listener_t* listener, char* reg_name, switch_core_session_t *session);
+
+/* For Emacs:
+ * Local Variables:
+ * mode:c
+ * indent-tabs-mode:t
+ * tab-width:4
+ * c-basic-offset:4
+ * End:
+ * For VIM:
+ * vim:set softtabstop=4 shiftwidth=4 tabstop=4:
+ */



More information about the Freeswitch-svn mailing list