[Freeswitch-svn] [commit] r12475 - freeswitch/trunk/src/mod/event_handlers/mod_erlang_event
FreeSWITCH SVN
andrew at freeswitch.org
Thu Mar 5 13:54:16 PST 2009
Author: andrew
Date: Thu Mar 5 15:54:16 2009
New Revision: 12475
Log:
Rework how spawned outbound pids communicate their pid back to the outbound call process, try to improve some memory management
Modified:
freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/ei_helpers.c
freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/handle_msg.c
freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c
freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h
Modified: freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/ei_helpers.c
==============================================================================
--- freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/ei_helpers.c (original)
+++ freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/ei_helpers.c Thu Mar 5 15:54:16 2009
@@ -131,7 +131,6 @@
ei_x_buff buf;
ei_x_new(&buf);
ei_x_encode_list_header(&buf, 1);
- ei_init_ref(ec, ref);
ei_x_encode_ref(&buf, ref);
ei_x_encode_empty_list(&buf);
Modified: freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/handle_msg.c
==============================================================================
--- freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/handle_msg.c (original)
+++ freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/handle_msg.c Thu Mar 5 15:54:16 2009
@@ -700,7 +700,8 @@
static switch_status_t handle_ref_tuple(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff *rbuf)
{
erlang_ref ref;
- erlang_pid *pid2, *pid = switch_core_alloc(listener->pool, sizeof(erlang_pid));
+ erlang_pid *pid;/* = switch_core_alloc(listener->pool, sizeof(erlang_pid));*/
+ void *p;
char hash[100];
int arity;
@@ -711,6 +712,14 @@
return SWITCH_STATUS_FALSE;
}
+ if (!(pid = malloc(sizeof(erlang_pid)))) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error\n");
+ ei_x_encode_tuple_header(rbuf, 2);
+ ei_x_encode_atom(rbuf, "error");
+ ei_x_encode_atom(rbuf, "badmem");
+ return SWITCH_STATUS_SUCCESS;
+ }
+
if (ei_decode_pid(buf->buff, &buf->index, pid)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Invalid pid in a reference/pid tuple\n");
return SWITCH_STATUS_FALSE;
@@ -720,19 +729,36 @@
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Hashed ref to %s\n", hash);
- if ((pid2 = (erlang_pid *) switch_core_hash_find(listener->spawn_pid_hash, hash))) {
- if (pid2 == NULL) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Found unfilled slot for %s\n", hash);
+ if ((p = switch_core_hash_find(listener->spawn_pid_hash, hash))) {
+ if (p == &globals.TIMEOUT) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Handler for %s timed out\n", hash);
+ switch_core_hash_delete(listener->spawn_pid_hash, hash);
+ ei_x_encode_tuple_header(rbuf, 2);
+ ei_x_encode_atom(rbuf, "error");
+ ei_x_encode_atom(rbuf, "timeout");
+ } else if (p == &globals.WAITING) {
+ /* update the key to point at a pid */
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Found waiting slot for %s\n", hash);
+ switch_core_hash_delete(listener->spawn_pid_hash, hash);
+ switch_core_hash_insert(listener->spawn_pid_hash, hash, pid);
+ return SWITCH_STATUS_FALSE; /*no reply */
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Found filled slot for %s\n", hash);
+ ei_x_encode_tuple_header(rbuf, 2);
+ ei_x_encode_atom(rbuf, "error");
+ ei_x_encode_atom(rbuf, "duplicate_response");
}
} else {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "No slot for %s\n", hash);
- switch_core_hash_insert(listener->spawn_pid_hash, hash, pid);
+ /* nothin in the hash */
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Empty slot for %s\n", hash);
+ ei_x_encode_tuple_header(rbuf, 2);
+ ei_x_encode_atom(rbuf, "error");
+ ei_x_encode_atom(rbuf, "invalid_ref");
}
- /* no reply */
- return SWITCH_STATUS_FALSE;
+ free(pid); /* don't need it */
+
+ return SWITCH_STATUS_SUCCESS;
}
@@ -758,8 +784,7 @@
break;
case ERL_REFERENCE_EXT :
case ERL_NEW_REFERENCE_EXT :
- handle_ref_tuple(listener, msg, buf, rbuf);
- return 0;
+ ret = handle_ref_tuple(listener, msg, buf, rbuf);
default :
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "WEEEEEEEE %d\n", type);
/* some other kind of erlang term */
Modified: freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c
==============================================================================
--- freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c (original)
+++ freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c Thu Mar 5 15:54:16 2009
@@ -413,7 +413,12 @@
if (type != ERL_STRING_EXT && type != ERL_BINARY_EXT) /* XXX no unicode or character codes > 255 */
return NULL;
- char *xmlstr = switch_core_alloc(ptr->listener->pool, size + 1);
+ char *xmlstr;
+
+ if (!(xmlstr = malloc(size + 1))) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Memory Error\n");
+ return NULL;
+ }
ei_decode_string_or_binary(rep->buff, &rep->index, size, xmlstr);
@@ -431,7 +436,7 @@
/*switch_safe_free(rep->buff);*/
/*switch_safe_free(rep);*/
- /*switch_safe_free(xmlstr);*/
+ free(xmlstr);
return xml;
}
@@ -811,7 +816,6 @@
switch_mutex_unlock(globals.listener_mutex);
switch_core_hash_init(&listener->fetch_reply_hash, listener->pool);
- switch_core_hash_init(&listener->spawn_pid_hash, listener->pool);
switch_assert(listener != NULL);
@@ -1071,6 +1075,7 @@
else {
char hash[100];
int i = 0;
+ void *p = NULL;
session_element->session = session;
erlang_pid *pid;
erlang_ref ref;
@@ -1081,12 +1086,16 @@
/* attach the session to the listener */
add_session_elem_to_listener(listener,session_element);
+ ei_init_ref(listener->ec, &ref);
+ ei_hash_ref(&ref, hash);
+ /* insert the waiting marker */
+ switch_core_hash_insert(listener->spawn_pid_hash, hash, &globals.WAITING);
+
if (!strcmp(function, "!")) {
/* send a message to request a pid */
ei_x_buff rbuf;
ei_x_new_with_version(&rbuf);
- ei_init_ref(listener->ec, &ref);
ei_x_encode_tuple_header(&rbuf, 3);
ei_x_encode_atom(&rbuf, "new_pid");
ei_x_encode_ref(&rbuf, &ref);
@@ -1107,23 +1116,27 @@
*/
}
- ei_hash_ref(&ref, hash);
-
- while (!(pid = (erlang_pid *) switch_core_hash_find(listener->spawn_pid_hash, hash))) {
+ /* loop until either we timeout or we get a value that's not the waiting marker */
+ while (!(p = switch_core_hash_find(listener->spawn_pid_hash, hash)) || p == &globals.WAITING) {
if (i > 50) { /* half a second timeout */
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Timed out when waiting for outbound pid\n");
switch_core_session_rwunlock(session);
remove_session_elem_from_listener(listener,session_element);
+ switch_core_hash_insert(listener->spawn_pid_hash, hash, &globals.TIMEOUT); /* TODO lock this? */
return NULL;
}
i++;
switch_yield(10000); /* 10ms */
}
+ switch_core_hash_delete(listener->spawn_pid_hash, hash);
+
+ pid = (erlang_pid *) p;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "got pid!\n");
session_element->process.type = ERLANG_PID;
memcpy(&session_element->process.pid, pid, sizeof(erlang_pid));
+ free(pid); /* malloced in handle_ref_tuple */
switch_set_flag(session_element, LFLAG_SESSION_ALIVE);
switch_clear_flag(session_element, LFLAG_OUTBOUND_INIT);
switch_clear_flag(session_element, LFLAG_WAITING_FOR_PID);
@@ -1222,6 +1235,7 @@
}
if (module && function) {
+ switch_core_hash_init(&listener->spawn_pid_hash, listener->pool);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Creating new spawned session for listener\n");
session_element=attach_call_to_spawned_process(listener, module, function, session);
} else {
Modified: freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h
==============================================================================
--- freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h (original)
+++ freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h Thu Mar 5 15:54:16 2009
@@ -136,6 +136,8 @@
unsigned int reference0;
unsigned int reference1;
unsigned int reference2;
+ char TIMEOUT; /* marker for a timed out request */
+ char WAITING; /* marker for a request waiting for a response */
switch_mutex_t *ref_mutex;
};
typedef struct globals_struct globals_t;
More information about the Freeswitch-svn
mailing list