[Freeswitch-svn] [commit] r11542 - freeswitch/trunk/src/mod/event_handlers/mod_erlang_event

FreeSWITCH SVN andrew at freeswitch.org
Wed Jan 28 11:26:37 PST 2009


Author: andrew
Date: Wed Jan 28 13:26:37 2009
New Revision: 11542

Log:
Patch from Rob Charlton to use rpc:call instead of spawn and to make the registered process argument to handlecall optional


Modified:
   freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/ei_helpers.c
   freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/handle_msg.c
   freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c
   freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h

Modified: freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/ei_helpers.c
==============================================================================
--- freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/ei_helpers.c	(original)
+++ freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/ei_helpers.c	Wed Jan 28 13:26:37 2009
@@ -122,6 +122,24 @@
 	ei_encode_switch_event_headers(ebuf, event);
 }
 
+/* function to make rpc call to remote node to retrieve a pid - 
+   calls module:function(Ref). The response comes back as
+   {rex, {Ref, Pid}}
+ */
+int ei_pid_from_rpc(struct ei_cnode_s *ec, int sockfd, erlang_ref *ref, char *module, char *function)
+{
+	ei_x_buff buf;
+	ei_x_new(&buf);
+	ei_x_encode_list_header(&buf, 1);
+	ei_init_ref(ec, ref);
+	ei_x_encode_ref(&buf, ref);
+	ei_x_encode_empty_list(&buf);
+
+	ei_rpc_to(ec, sockfd, module, function, buf.buff, buf.index);
+	ei_x_free(&buf);
+
+	return 0;
+}
 
 /* function to spawn a process on a remote node */
 int ei_spawn(struct ei_cnode_s *ec, int sockfd, erlang_ref *ref, char *module, char *function, int argc, char **argv)

Modified: freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/handle_msg.c
==============================================================================
--- freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/handle_msg.c	(original)
+++ freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/handle_msg.c	Wed Jan 28 13:26:37 2009
@@ -37,6 +37,8 @@
 
 static char *MARKER = "1";
 
+static switch_status_t handle_ref_tuple(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff *rbuf);
+
 static void *SWITCH_THREAD_FUNC api_exec(switch_thread_t *thread, void *obj)
 {
 	switch_bool_t r = SWITCH_TRUE;
@@ -519,15 +521,18 @@
 	return SWITCH_STATUS_SUCCESS;
 }
 
-/* {handlecall,<uuid>,<handler process registered name>} */
-static switch_status_t handle_msg_handlecall(listener_t *listener, int arity, ei_x_buff *buf, ei_x_buff *rbuf)
+/* {handlecall,<uuid>,<handler process registered name>}
+   or
+   {handlecall,<uuid>} to send messages back to the sender
+ */
+static switch_status_t handle_msg_handlecall(listener_t *listener, erlang_msg *msg, int arity, ei_x_buff *buf, ei_x_buff *rbuf)
 {
 	char reg_name[MAXATOMLEN];
 	char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1];
 
-	if (arity != 3 ||
-		ei_decode_string_or_binary(buf->buff, &buf->index, SWITCH_UUID_FORMATTED_LENGTH, uuid_str) ||
-		ei_decode_atom(buf->buff, &buf->index, reg_name)) {
+	if (arity < 2 || arity > 3 ||
+		(arity==3 && ei_decode_atom(buf->buff, &buf->index, reg_name)) ||
+		ei_decode_string_or_binary(buf->buff, &buf->index, SWITCH_UUID_FORMATTED_LENGTH, uuid_str)) {
 		ei_x_encode_tuple_header(rbuf, 2);
 		ei_x_encode_atom(rbuf, "error");
 		ei_x_encode_atom(rbuf, "badarg");
@@ -535,7 +540,8 @@
 		switch_core_session_t *session;
 		if (!switch_strlen_zero(uuid_str) && (session = switch_core_session_locate(uuid_str))) {
 			/* create a new session list element and attach it to this listener */
-			if (attach_call_to_registered_process(listener, reg_name, session)) {
+			if ((arity==2 && attach_call_to_pid(listener, &msg->from, session)) ||
+				(arity==3 && attach_call_to_registered_process(listener, reg_name, session))) {
 				ei_x_encode_atom(rbuf, "ok");
 			} else {
 				ei_x_encode_tuple_header(rbuf, 2);
@@ -551,6 +557,28 @@
 	return SWITCH_STATUS_SUCCESS;
 }
 
+/* catch the response to ei_rpc_to (which comes back as {rex, {Ref, Pid}}
+   The {Ref,Pid} bit can be handled by handle_ref_tuple
+ */
+static switch_status_t handle_msg_rpcresponse(listener_t *listener, erlang_msg *msg, int arity, ei_x_buff *buf, ei_x_buff *rbuf)
+{
+	int type, size, arity2, tmpindex;
+
+	ei_get_type(buf->buff, &buf->index, &type, &size);
+	switch(type) {
+	case ERL_SMALL_TUPLE_EXT :
+	case ERL_LARGE_TUPLE_EXT :
+		tmpindex = buf->index;
+		ei_decode_tuple_header(buf->buff, &tmpindex, &arity2);
+		return handle_ref_tuple(listener,msg,buf,rbuf);
+	default:
+		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Unknown rpc response\n");
+		break;
+	}
+	/* no reply */
+	return SWITCH_STATUS_FALSE;
+}
+
 static switch_status_t handle_msg_tuple(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff *rbuf)
 {
 	char tupletag[MAXATOMLEN];
@@ -583,7 +611,9 @@
 		} else if (!strncmp(tupletag, "bind", MAXATOMLEN)) {
 			ret = handle_msg_bind(listener,msg,buf,rbuf);
 		} else if (!strncmp(tupletag, "handlecall", MAXATOMLEN)) {
-			ret = handle_msg_handlecall(listener,arity,buf,rbuf);
+			ret = handle_msg_handlecall(listener,msg,arity,buf,rbuf);
+		} else if (!strncmp(tupletag, "rex", MAXATOMLEN)) {
+			ret = handle_msg_rpcresponse(listener,msg,arity,buf,rbuf);
 		} else {
 			ei_x_encode_tuple_header(rbuf, 2);
 			ei_x_encode_atom(rbuf, "error");
@@ -701,7 +731,8 @@
 		switch_core_hash_insert(listener->spawn_pid_hash, hash, pid);
 	}
 
-	return SWITCH_STATUS_SUCCESS;
+	/* no reply */
+	return SWITCH_STATUS_FALSE;
 }
 
 

Modified: freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c
==============================================================================
--- freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c	(original)
+++ freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c	Wed Jan 28 13:26:37 2009
@@ -187,8 +187,8 @@
 		l = lp;
 		lp = lp->next;
 
-		/* test all of the sessions attached to this event in case
-		   one of them should receive it as well
+		/* test all of the sessions attached to this listener in case
+		   one of them should receive the event as well
 		 */
 
 		send_event_to_attached_sessions(l,event);
@@ -684,9 +684,10 @@
 		ei_x_buff rbuf;
 		ei_x_new_with_version(&rbuf);
 
-		switch_mutex_lock(listener->sock_mutex);
+		/* do we need the mutex when reading? */
+		/*switch_mutex_lock(listener->sock_mutex);*/
 		status = ei_xreceive_msg_tmo(listener->sockfd, &msg, &buf, 100);
-		switch_mutex_unlock(listener->sock_mutex);
+		/*switch_mutex_unlock(listener->sock_mutex);*/
 
 		switch(status) {
 			case ERL_TICK :
@@ -780,7 +781,9 @@
 					ei_x_encode_atom(&rbuf, "error");
 					ei_x_encode_atom(&rbuf, "acldeny");
 
+					switch_mutex_lock(listener->sock_mutex);
 					ei_send(listener->sockfd, &msg.from, rbuf.buff, rbuf.index);
+					switch_mutex_unlock(listener->sock_mutex);
 #ifdef EI_DEBUG
 					ei_x_print_msg(&rbuf, &msg.from, 1);
 #endif
@@ -1026,6 +1029,33 @@
 	return session_element;
 }
 
+session_elem_t* attach_call_to_pid(listener_t* listener, erlang_pid* pid, switch_core_session_t *session)
+{
+	/* create a session list element */
+	session_elem_t* session_element = NULL;
+	if (!(session_element = switch_core_alloc(switch_core_session_get_pool(session), sizeof(*session_element)))) {
+		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to allocate session element\n");
+	}
+	else {
+		if (SWITCH_STATUS_SUCCESS != switch_core_session_read_lock(session)) {
+			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to get session read lock\n");
+		}
+		else {
+			session_element->session = session;
+			session_element->process.type = ERLANG_PID;
+			memcpy(&session_element->process.pid, pid, sizeof(erlang_pid));
+			switch_set_flag(session_element, LFLAG_SESSION_ALIVE);
+			switch_clear_flag(session_element, LFLAG_OUTBOUND_INIT);
+			switch_queue_create(&session_element->event_queue, SWITCH_CORE_QUEUE_LEN, switch_core_session_get_pool(session));
+			switch_mutex_init(&session_element->flag_mutex, SWITCH_MUTEX_NESTED, switch_core_session_get_pool(session));
+			/* attach the session to the listener */
+			add_session_elem_to_listener(listener,session_element);
+
+			ei_link(listener, ei_self(listener->ec), pid);
+		}
+	}
+	return session_element;
+}
 
 session_elem_t* attach_call_to_spawned_process(listener_t* listener, char *module, char *function, switch_core_session_t *session)
 {
@@ -1039,7 +1069,7 @@
 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to get session read lock\n");
 		}
 		else {
-			char *argv[1], hash[100];
+			char hash[100];
 			int i = 0;
 			session_element->session = session;
 			erlang_pid *pid;
@@ -1061,13 +1091,20 @@
 				ei_x_encode_atom(&rbuf, "new_pid");
 				ei_x_encode_ref(&rbuf, &ref);
 				ei_x_encode_pid(&rbuf, ei_self(listener->ec));
+				/* should lock with mutex? */
 				ei_reg_send(listener->ec, listener->sockfd, module, rbuf.buff, rbuf.index);
 #ifdef EI_DEBUG
 				ei_x_print_reg_msg(&rbuf, module, 1);
 #endif
 				ei_x_free(&rbuf);
 			} else {
+				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "rpc call: %s:%s(Ref)\n", module, function);
+				/* should lock with mutex? */
+				ei_pid_from_rpc(listener->ec, listener->sockfd, &ref, module, function);
+				/*
+				char *argv[1];
 				ei_spawn(listener->ec, listener->sockfd, &ref, module, function, 0, argv);
+				*/
 			}
 
 			ei_hash_ref(&ref, hash);
@@ -1091,7 +1128,8 @@
 			switch_clear_flag(session_element, LFLAG_OUTBOUND_INIT);
 			switch_clear_flag(session_element, LFLAG_WAITING_FOR_PID);
 
-			ei_link(listener, ei_self(listener->ec), pid);
+			/* this hangs because it can never get hold of the socket mutex */
+			 ei_link(listener, ei_self(listener->ec), pid); 
 		}
 	}
 	return session_element;

Modified: freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h
==============================================================================
--- freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h	(original)
+++ freeswitch/trunk/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h	Wed Jan 28 13:26:37 2009
@@ -192,6 +192,7 @@
 void ei_link(listener_t *listener, erlang_pid *from, erlang_pid *to);
 void ei_encode_switch_event_headers(ei_x_buff *ebuf, switch_event_t *event);
 void ei_encode_switch_event_tag(ei_x_buff *ebuf, switch_event_t *event, char *tag);
+int ei_pid_from_rpc(struct ei_cnode_s *ec, int sockfd, erlang_ref *ref, char *module, char *function);
 int ei_spawn(struct ei_cnode_s *ec, int sockfd, erlang_ref *ref, char *module, char *function, int argc, char **argv);
 void ei_init_ref(struct ei_cnode_s *ec, erlang_ref *ref);
 void ei_x_print_reg_msg(ei_x_buff *buf, char *dest, int send);
@@ -215,6 +216,7 @@
 
 /* mod_erlang_event.c */
 session_elem_t* attach_call_to_registered_process(listener_t* listener, char* reg_name, switch_core_session_t *session);
+session_elem_t* attach_call_to_pid(listener_t* listener, erlang_pid* pid, switch_core_session_t *session);
 session_elem_t* attach_call_to_spawned_process(listener_t* listener, char *module, char *function, switch_core_session_t *session);
 
 /* For Emacs:



More information about the Freeswitch-svn mailing list