[Freeswitch-svn] [commit] r10403 - freeswitch/trunk/src/mod/event_handlers/mod_erlang_event
FreeSWITCH SVN
andrew at freeswitch.org
Fri Nov 14 09:55:21 PST 2008
Author: andrew
Date: Fri Nov 14 12:55:20 2008
New Revision: 10403
Log:
Ton of stuff, mainly laying groundwork for bind_search functionality
Modified:
freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c
Modified: freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c
==============================================================================
--- freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c (original)
+++ freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c Fri Nov 14 12:55:20 2008
@@ -53,14 +53,6 @@
LFLAG_STATEFUL = (1 << 8)
} event_flag_t;
-
-/* TODO - support multiple event handlers per erlang connection each with their own event filters? */
-struct event_handler {
- erlang_pid pid;
- switch_hash_t *event_hash;
- struct event_handler *next;
-};
-
struct listener {
int sockfd;
struct ei_cnode_s *ec;
@@ -70,6 +62,7 @@
switch_queue_t *log_queue;
switch_memory_pool_t *pool;
switch_mutex_t *flag_mutex;
+ switch_mutex_t *sock_mutex;
char *ebuf;
uint32_t flags;
switch_log_level_t level;
@@ -98,6 +91,19 @@
#define MAX_ACL 100
+struct erlang_binding {
+ switch_xml_section_t section;
+ erlang_pid pid;
+ char *registered_process; /* TODO */
+ listener_t *listener;
+ struct erlang_binding *next;
+};
+
+static struct {
+ struct erlang_binding *head;
+ switch_xml_binding_t *search_binding;
+} bindings;
+
static struct {
switch_mutex_t *mutex;
char *ip;
@@ -163,6 +169,49 @@
}
+/* Stolen from code added to ei in R12B-5.
+ * Since not everyone has this verison yet;
+ * provide our own version.
+ * */
+
+#define put8(s,n) do { \
+ (s)[0] = (char)((n) & 0xff); \
+ (s) += 1; \
+} while (0)
+
+#define put32be(s,n) do { \
+ (s)[0] = ((n) >> 24) & 0xff; \
+ (s)[1] = ((n) >> 16) & 0xff; \
+ (s)[2] = ((n) >> 8) & 0xff; \
+ (s)[3] = (n) & 0xff; \
+ (s) += 4; \
+} while (0)
+
+static void ei_link(listener_t *listener, erlang_pid *from, erlang_pid *to) {
+ char msgbuf[2048];
+ char *s;
+ int index = 0;
+ /*int n;*/
+
+ index = 5; /* max sizes: */
+ ei_encode_version(msgbuf,&index); /* 1 */
+ ei_encode_tuple_header(msgbuf,&index,3);
+ ei_encode_long(msgbuf,&index,ERL_LINK);
+ ei_encode_pid(msgbuf,&index,from); /* 268 */
+ ei_encode_pid(msgbuf,&index,to); /* 268 */
+
+ /* 5 byte header missing */
+ s = msgbuf;
+ put32be(s, index - 4); /* 4 */
+ put8(s, ERL_PASS_THROUGH); /* 1 */
+ /* sum: 542 */
+
+ switch_mutex_lock(listener->sock_mutex);
+ write(listener->sockfd, msgbuf, index);
+ switch_mutex_unlock(listener->sock_mutex);
+}
+
+
static void expire_listener(listener_t **listener)
{
void *pop;
@@ -180,6 +229,38 @@
}
+static void remove_binding(listener_t *listener) {
+ struct erlang_binding *ptr, *lst = NULL;
+
+ switch_mutex_lock(globals.listener_mutex);
+
+ switch_xml_set_binding_sections(bindings.search_binding, (1 << sizeof(switch_xml_section_enum_t)));
+
+ for (ptr = bindings.head; ptr; lst = ptr, ptr = ptr->next) {
+ if (ptr->listener == listener) {
+ if (bindings.head == ptr) {
+ if (ptr->next) {
+ bindings.head = ptr->next;
+ } else {
+ bindings.head = NULL;
+ break;
+ }
+ } else {
+ if (ptr->next) {
+ lst->next = ptr->next;
+ } else {
+ lst->next = NULL;
+ }
+ }
+ } else {
+ switch_xml_set_binding_sections(bindings.search_binding, switch_xml_get_binding_sections(bindings.search_binding) | ptr->section);
+ }
+ }
+
+ switch_mutex_unlock(globals.listener_mutex);
+}
+
+
static void ei_encode_switch_event(ei_x_buff *ebuf, switch_event_t *event)
{
int i;
@@ -295,40 +376,6 @@
}
-SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_erlang_event_shutdown)
-{
- listener_t *l;
- int sanity = 0;
-
- prefs.done = 1;
-
- switch_log_unbind_logger(socket_logger);
-
- /*close_socket(&listen_list.sockfd);*/
-
- while (prefs.threads || prefs.done == 1) {
- switch_yield(10000);
- if (++sanity == 1000) {
- break;
- }
- }
-
- switch_event_unbind(&globals.node);
-
- switch_mutex_lock(globals.listener_mutex);
-
- for (l = listen_list.listeners; l; l = l->next) {
- close_socket(&l->sockfd);
- }
-
- switch_mutex_unlock(globals.listener_mutex);
-
- switch_sleep(1500000); /* sleep for 1.5 seconds */
-
- return SWITCH_STATUS_SUCCESS;
-}
-
-
static void add_listener(listener_t *listener)
{
/* add me to the listeners so I get events */
@@ -357,25 +404,6 @@
switch_mutex_unlock(globals.listener_mutex);
}
-SWITCH_MODULE_LOAD_FUNCTION(mod_erlang_event_load)
-{
- switch_mutex_init(&globals.listener_mutex, SWITCH_MUTEX_NESTED, pool);
-
- if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, NULL, &globals.node) != SWITCH_STATUS_SUCCESS) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't bind!\n");
- close_socket(&listen_list.sockfd);
- return SWITCH_STATUS_GENERR;
- }
-
- switch_log_bind_logger(socket_logger, SWITCH_LOG_DEBUG, SWITCH_FALSE);
-
- /* connect my internal structure to the blank pointer passed to me */
- *module_interface = switch_loadable_module_create_module_interface(pool, modname);
-
- /* indicate that the module should continue to be loaded */
- return SWITCH_STATUS_SUCCESS;
-}
-
struct api_command_struct {
char *api_cmd;
@@ -456,7 +484,9 @@
ei_x_encode_string(&ebuf, acs->uuid_str);
ei_x_encode_string(&ebuf, reply);
+ switch_mutex_lock(acs->listener->sock_mutex);
ei_send(acs->listener->sockfd, &acs->pid, ebuf.buff, ebuf.index);
+ switch_mutex_unlock(acs->listener->sock_mutex);
ei_x_free(&ebuf);
}
@@ -478,7 +508,11 @@
ei_x_encode_string(&rbuf, reply);
+
+ switch_mutex_lock(acs->listener->sock_mutex);
ei_send(acs->listener->sockfd, &acs->pid, rbuf.buff, rbuf.index);
+ switch_mutex_unlock(acs->listener->sock_mutex);
+
ei_x_free(&rbuf);
}
@@ -500,6 +534,38 @@
}
+static switch_xml_t erlang_fetch (const char *sectionstr, const char *tag_name, const char *key_name, const char *key_value,
+ switch_event_t *params, void *user_data)
+{
+ switch_xml_t xml = NULL;
+ struct erlang_binding *ptr;
+
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "looking for bindings\n");
+
+ switch_xml_section_t section = switch_xml_parse_section_string((char *) sectionstr);
+
+ for (ptr = bindings.head; ptr && ptr->section != section; ptr = ptr->next); /* just get the first match */
+
+ if (!ptr) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "no binding for %s\n", sectionstr);
+ return NULL;
+ }
+
+ if (!ptr->listener) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "NULL pointer binding!\n");
+ return NULL; /* our pointer is trash */
+ }
+
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "binding for %s in section %s with key %s and value %s requested from node %s\n", tag_name, sectionstr, key_name, key_value, ptr->pid.node);
+
+
+ switch_mutex_lock(ptr->listener->sock_mutex);
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "It's a lock!\n");
+ switch_mutex_unlock(ptr->listener->sock_mutex);
+
+ return xml;
+}
+
static int handle_msg(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff *rbuf)
{
@@ -829,6 +895,57 @@
break;
}
+ } else if (!strncmp(tupletag, "bind", MAXATOMLEN)) {
+
+ /* format is (result|config|directory|dialplan|phrases) */
+ char sectionstr[MAXATOMLEN];
+
+ if (ei_decode_atom(buf->buff, &buf->index, sectionstr)) {
+ ei_x_encode_tuple_header(rbuf, 2);
+ ei_x_encode_atom(rbuf, "error");
+ ei_x_encode_atom(rbuf, "badarg");
+ break;
+ }
+
+ switch_xml_section_t section;
+
+ if (!(section = switch_xml_parse_section_string(sectionstr))) {
+ ei_x_encode_tuple_header(rbuf, 2);
+ ei_x_encode_atom(rbuf, "error");
+ ei_x_encode_atom(rbuf, "badarg");
+ break;
+ }
+
+ struct erlang_binding *binding, *ptr;
+
+ if (!(binding = switch_core_alloc(listener->pool, sizeof(*binding)))) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error\n");
+ ei_x_encode_tuple_header(rbuf, 2);
+ ei_x_encode_atom(rbuf, "error");
+ ei_x_encode_atom(rbuf, "badmem");
+ break;
+ }
+
+ binding->section = section;
+ binding->pid = msg->from;
+ binding->listener = listener;
+
+ switch_mutex_lock(globals.listener_mutex);
+
+ for (ptr = bindings.head; ptr && ptr->next; ptr = ptr->next);
+
+ if (ptr) {
+ ptr->next = binding;
+ } else {
+ bindings.head = binding;
+ }
+
+ switch_xml_set_binding_sections(bindings.search_binding, switch_xml_get_binding_sections(bindings.search_binding) | section);
+ switch_mutex_unlock(globals.listener_mutex);
+
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "sections %d\n", switch_xml_get_binding_sections(bindings.search_binding));
+
+ ei_link(listener, ei_self(listener->ec), &msg->from);
} else {
ei_x_encode_tuple_header(rbuf, 2);
@@ -852,10 +969,12 @@
switch_clear_flag_locked(listener, LFLAG_LOG);
}
} else if (!strncmp(atom, "register_log_handler", MAXATOMLEN)) {
+ ei_link(listener, ei_self(listener->ec), &msg->from);
listener->log_pid = msg->from;
listener->level = SWITCH_LOG_DEBUG;
switch_set_flag(listener, LFLAG_LOG);
} else if (!strncmp(atom, "register_event_handler", MAXATOMLEN)) {
+ ei_link(listener, ei_self(listener->ec), &msg->from);
listener->event_pid = msg->from;
if (!switch_test_flag(listener, LFLAG_EVENTS)) {
switch_set_flag_locked(listener, LFLAG_EVENTS);
@@ -888,6 +1007,10 @@
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "ok");
ei_x_encode_pid(rbuf, ei_self(listener->ec));
+ } else if (!strncmp(atom, "link", MAXATOMLEN)) {
+ /* debugging */
+ ei_link(listener, ei_self(listener->ec), &msg->from);
+ goto noreply;
} else {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
@@ -905,12 +1028,17 @@
break;
}
+
+ switch_mutex_lock(listener->sock_mutex);
ei_send(listener->sockfd, &msg->from, rbuf->buff, rbuf->index);
+ switch_mutex_unlock(listener->sock_mutex);
noreply:
return 0;
event_done:
+ switch_mutex_lock(listener->sock_mutex);
ei_send(listener->sockfd, &msg->from, rbuf->buff, rbuf->index);
+ switch_mutex_unlock(listener->sock_mutex);
return 1;
}
@@ -985,7 +1113,9 @@
ei_x_buff rbuf;
ei_x_new_with_version(&rbuf);
+ switch_mutex_lock(listener->sock_mutex);
status = ei_xreceive_msg_tmo(listener->sockfd, &msg, &buf, 100);
+ switch_mutex_unlock(listener->sock_mutex);
switch(status) {
case ERL_TICK :
@@ -1043,7 +1173,11 @@
ei_x_encode_tuple_header(&lbuf, 2);
ei_x_encode_atom(&lbuf, "log");
ei_x_encode_string(&lbuf, data);
+
+ switch_mutex_lock(listener->sock_mutex);
ei_send(listener->sockfd, &listener->log_pid, lbuf.buff, lbuf.index);
+ switch_mutex_unlock(listener->sock_mutex);
+
ei_x_free(&lbuf);
}
}
@@ -1060,7 +1194,9 @@
ei_encode_switch_event(&ebuf, pevent);
+ switch_mutex_lock(listener->sock_mutex);
ei_send(listener->sockfd, &listener->event_pid, ebuf.buff, ebuf.index);
+ switch_mutex_unlock(listener->sock_mutex);
ei_x_free(&ebuf);
switch_event_destroy(&pevent);
@@ -1084,6 +1220,9 @@
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connection Closed\n");
switch_core_hash_destroy(&listener->event_hash);
+ /* remove any bindings for this connection */
+ remove_binding(listener);
+
if (listener->session) {
switch_channel_clear_flag(switch_core_session_get_channel(listener->session), CF_CONTROLLED);
switch_clear_flag_locked(listener, LFLAG_SESSION);
@@ -1093,6 +1232,7 @@
switch_core_destroy_memory_pool(&pool);
}
+
switch_mutex_lock(globals.listener_mutex);
prefs.threads--;
switch_mutex_unlock(globals.listener_mutex);
@@ -1111,6 +1251,7 @@
switch_threadattr_detach_set(thd_attr, 1);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_thread_create(&thread, thd_attr, listener_run, listener, listener->pool);
+
}
@@ -1173,6 +1314,39 @@
}
+/* Module Hooks */
+
+
+SWITCH_MODULE_LOAD_FUNCTION(mod_erlang_event_load)
+{
+ switch_mutex_init(&globals.listener_mutex, SWITCH_MUTEX_NESTED, pool);
+
+ if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, NULL, &globals.node) != SWITCH_STATUS_SUCCESS) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't bind!\n");
+ close_socket(&listen_list.sockfd);
+ return SWITCH_STATUS_GENERR;
+ }
+
+ switch_log_bind_logger(socket_logger, SWITCH_LOG_DEBUG, SWITCH_FALSE);
+
+ memset(&bindings, 0, sizeof(bindings));
+
+ if (switch_xml_bind_search_function_ret(erlang_fetch, (1 << sizeof(switch_xml_section_enum_t)), NULL, &bindings.search_binding) != SWITCH_STATUS_SUCCESS) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't bind!\n");
+ close_socket(&listen_list.sockfd);
+ return SWITCH_STATUS_GENERR;
+ }
+
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "sections %d\n", switch_xml_get_binding_sections(bindings.search_binding));
+
+ /* connect my internal structure to the blank pointer passed to me */
+ *module_interface = switch_loadable_module_create_module_interface(pool, modname);
+
+ /* indicate that the module should continue to be loaded */
+ return SWITCH_STATUS_SUCCESS;
+}
+
+
SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime)
{
switch_memory_pool_t *pool = NULL, *listener_pool = NULL;
@@ -1328,6 +1502,7 @@
listener_pool = NULL;
listener->level = SWITCH_LOG_DEBUG;
switch_mutex_init(&listener->flag_mutex, SWITCH_MUTEX_NESTED, listener->pool);
+ switch_mutex_init(&listener->sock_mutex, SWITCH_MUTEX_NESTED, listener->pool);
switch_core_hash_init(&listener->event_hash, listener->pool);
launch_listener_thread(listener);
@@ -1358,6 +1533,41 @@
return SWITCH_STATUS_TERM;
}
+SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_erlang_event_shutdown)
+{
+ listener_t *l;
+ int sanity = 0;
+
+ prefs.done = 1;
+
+ switch_log_unbind_logger(socket_logger);
+
+ /*close_socket(&listen_list.sockfd);*/
+
+ while (prefs.threads || prefs.done == 1) {
+ switch_yield(10000);
+ if (++sanity == 1000) {
+ break;
+ }
+ }
+
+ switch_event_unbind(&globals.node);
+ switch_xml_unbind_search_function_ptr(erlang_fetch);
+
+ switch_mutex_lock(globals.listener_mutex);
+
+ for (l = listen_list.listeners; l; l = l->next) {
+ close_socket(&l->sockfd);
+ }
+
+ switch_mutex_unlock(globals.listener_mutex);
+
+ switch_sleep(1500000); /* sleep for 1.5 seconds */
+
+ return SWITCH_STATUS_SUCCESS;
+}
+
+
/* For Emacs:
* Local Variables:
* mode:c
More information about the Freeswitch-svn
mailing list