[Freeswitch-svn] [commit] r10132 - freeswitch/trunk/src/mod/event_handlers/mod_event_socket

Freeswitch SVN anthm at freeswitch.org
Thu Oct 23 13:31:23 EDT 2008


Author: anthm
Date: Thu Oct 23 13:31:22 2008
New Revision: 10132

Modified:
   freeswitch/trunk/src/mod/event_handlers/mod_event_socket/mod_event_socket.c

Log:
add nifty new goodies

Modified: freeswitch/trunk/src/mod/event_handlers/mod_event_socket/mod_event_socket.c
==============================================================================
--- freeswitch/trunk/src/mod/event_handlers/mod_event_socket/mod_event_socket.c	(original)
+++ freeswitch/trunk/src/mod/event_handlers/mod_event_socket/mod_event_socket.c	Thu Oct 23 13:31:22 2008
@@ -47,7 +47,8 @@
 	LFLAG_FULL = (1 << 4),
 	LFLAG_MYEVENTS = (1 << 5),
 	LFLAG_SESSION = (1 << 6),
-	LFLAG_ASYNC = (1 << 7)
+	LFLAG_ASYNC = (1 << 7),
+	LFLAG_STATEFUL = (1 << 8)
 } event_flag_t;
 
 typedef enum {
@@ -72,6 +73,9 @@
 	int lost_events;
 	int lost_logs;
 	int hup;
+	time_t last_flush;
+	uint32_t timeout;
+	uint32_t id;
 	switch_sockaddr_t *sa;
 	char remote_ip[50];
 	switch_port_t remote_port;
@@ -99,8 +103,21 @@
 	int threads;
 	char *acl[MAX_ACL];
 	uint32_t acl_count;
+	uint32_t id;
 } prefs;
 
+
+static void remove_listener(listener_t *listener);
+
+static uint32_t next_id(void)
+{
+	uint32_t id;
+	switch_mutex_lock(listen_list.mutex);
+    id = ++prefs.id;
+    switch_mutex_unlock(listen_list.mutex);
+	return id;
+}
+
 SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_pref_ip, prefs.ip);
 SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_pref_pass, prefs.password);
 
@@ -165,6 +182,14 @@
 			continue;
 		}
 
+		if (switch_test_flag(l, LFLAG_STATEFUL) && l->timeout && switch_timestamp(NULL) - l->last_flush > l->timeout) {
+			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Stateful Listener %u has expired\n", l->id);
+			remove_listener(l);
+			switch_thread_rwlock_unlock(l->rwlock);
+			switch_core_hash_destroy(&l->event_hash);
+			switch_core_destroy_memory_pool(&l->pool);
+		}
+
 		if (l->event_list[SWITCH_EVENT_ALL]) {
 			send = 1;
 		} else if ((l->event_list[event->event_id])) {
@@ -347,18 +372,6 @@
 	return SWITCH_STATUS_SUCCESS;
 }
 
-SWITCH_MODULE_LOAD_FUNCTION(mod_event_socket_load)
-{
-	switch_application_interface_t *app_interface;
-
-	/* connect my internal structure to the blank pointer passed to me */
-	*module_interface = switch_loadable_module_create_module_interface(pool, modname);
-	SWITCH_ADD_APP(app_interface, "socket", "Connect to a socket", "Connect to a socket", socket_function, "<ip>[:<port>]", SAF_SUPPORT_NOMEDIA);
-
-	/* indicate that the module should continue to be loaded */
-	return SWITCH_STATUS_SUCCESS;
-}
-
 static void add_listener(listener_t *listener)
 {
 	/* add me to the listeners so I get events */
@@ -386,6 +399,24 @@
 	switch_mutex_unlock(listen_list.mutex);
 }
 
+
+static listener_t *find_listener(uint32_t id)
+{
+	listener_t *l, *r = NULL;
+
+	switch_mutex_lock(listen_list.mutex);
+	for (l = listen_list.listeners; l; l = l->next) {
+		if (l->id && l->id == id) {
+			if (switch_thread_rwlock_tryrdlock(l->rwlock) == SWITCH_STATUS_SUCCESS) {
+				r = l;
+			}
+			break;
+		}
+	}
+	switch_mutex_unlock(listen_list.mutex);
+	return r;
+}
+
 static void strip_cr(char *s)
 {
 	char *p;
@@ -394,6 +425,242 @@
 	}
 }
 
+
+static void xmlize_listener(listener_t *listener, switch_stream_handle_t *stream)
+{
+	stream->write_function(stream, " <listener>\n");
+	stream->write_function(stream, "  <listen-id>%u</listen-id>\n", listener->id);
+	stream->write_function(stream, "  <format>%s</format>\n", listener->format == EVENT_FORMAT_XML ? "xml" : "plain");
+	stream->write_function(stream, "  <timeout>%u</timeout>\n", listener->timeout);
+	stream->write_function(stream, " </listener>\n");
+}
+
+SWITCH_STANDARD_API(event_manager_function)
+{
+	char *http = NULL;
+	char *wcmd = NULL;
+	char *format = NULL;
+	listener_t *listener = NULL;
+
+	if (stream->param_event) {
+        http = switch_event_get_header(stream->param_event, "http-host");
+		wcmd = switch_event_get_header(stream->param_event, "command");
+		format = switch_event_get_header(stream->param_event, "format");
+    }
+
+	if (!http) {
+		stream->write_function(stream, "This is a web application.!\n");
+        return SWITCH_STATUS_SUCCESS;
+	}
+	stream->write_function(stream, "Content-Type: text/xml\n\n");
+	
+	stream->write_function(stream, "<?xml version=\"1.0\"?>\n");
+	stream->write_function(stream, "<root>\n");
+	
+	if (!wcmd) {
+		stream->write_function(stream, "<data><reply type=\"error\">Missing command parameter!</reply></data>\n");
+		goto end;
+	}
+	
+	if (!format) {
+		format = "xml";
+	}
+
+	
+	
+	if (!strcasecmp(wcmd, "create-listener")) {
+		char *events = switch_event_get_header(stream->param_event, "events");
+		switch_memory_pool_t *pool;
+		char *next, *cur;
+		uint32_t count = 0, key_count = 0;
+		uint8_t custom = 0;
+		char *edup;
+		
+		if (switch_strlen_zero(events)) {
+			stream->write_function(stream, "<data><reply type=\"error\">Missing parameter!</reply></data>\n");
+			goto end;
+		}
+
+		switch_core_new_memory_pool(&pool);
+		listener = switch_core_alloc(pool, sizeof(*listener));
+		listener->pool = pool;
+		listener->format = EVENT_FORMAT_PLAIN;
+		switch_mutex_init(&listener->flag_mutex, SWITCH_MUTEX_NESTED, listener->pool);
+		switch_core_hash_init(&listener->event_hash, listener->pool);
+		switch_set_flag(listener, LFLAG_AUTHED);
+		switch_set_flag(listener, LFLAG_STATEFUL);
+		switch_queue_create(&listener->event_queue, SWITCH_CORE_QUEUE_LEN, listener->pool);
+		switch_thread_rwlock_create(&listener->rwlock, listener->pool);
+		listener->id = next_id();
+		listener->timeout = 60;
+		listener->last_flush = switch_timestamp(NULL);
+		
+		if (switch_stristr("xml", format)) {
+			listener->format = EVENT_FORMAT_XML;
+		} else {
+			listener->format = EVENT_FORMAT_PLAIN;
+		}
+
+		edup = strdup(events);
+		
+		for (cur = edup; cur; count++) {
+			switch_event_types_t type;
+
+			if ((next = strchr(cur, ' '))) {
+				*next++ = '\0';
+			}
+
+			if (custom) {
+				switch_core_hash_insert(listener->event_hash, cur, MARKER);
+			} else if (switch_name_event(cur, &type) == SWITCH_STATUS_SUCCESS) {
+				key_count++;
+				if (type == SWITCH_EVENT_ALL) {
+					uint32_t x = 0;
+					for (x = 0; x < SWITCH_EVENT_ALL; x++) {
+						listener->event_list[x] = 1;
+					}
+				}
+				if (type <= SWITCH_EVENT_ALL) {
+					listener->event_list[type] = 1;
+				}
+				if (type == SWITCH_EVENT_CUSTOM) {
+					custom++;
+				}
+			}
+
+			cur = next;
+		}
+		
+
+		switch_safe_free(edup);
+
+		if (!key_count) {
+			switch_core_hash_destroy(&listener->event_hash);
+			switch_core_destroy_memory_pool(&listener->pool);
+			stream->write_function(stream, "<data><reply type=\"error\">No keywords supplied</reply></data>\n");
+			goto end;
+		}
+
+
+		switch_set_flag_locked(listener, LFLAG_EVENTS);
+		add_listener(listener);
+		stream->write_function(stream, "<data>\n");
+		stream->write_function(stream, " <reply type=\"success\">Listener %u Created</reply>\n", listener->id);
+		xmlize_listener(listener, stream);
+		stream->write_function(stream, "</data>\n");
+
+		goto end;
+	} else if (!strcasecmp(wcmd, "destroy-listener")) {		
+		char *id = switch_event_get_header(stream->param_event, "listen-id");
+		uint32_t idl = (uint32_t) atol(id);
+
+		if ((listener = find_listener(idl))) {
+			remove_listener(listener);
+			stream->write_function(stream, "<data>\n <reply type=\"success\">listener %u destroyed</reply>\n", listener->id);
+			xmlize_listener(listener, stream);
+			stream->write_function(stream, "</data>\n");
+			switch_thread_rwlock_unlock(listener->rwlock);
+			switch_core_hash_destroy(&listener->event_hash);
+			switch_core_destroy_memory_pool(&listener->pool);
+			goto end;
+		} else {
+			stream->write_function(stream, "<data><reply type=\"error\">Can't find listener</reply></data>\n");
+			goto end;
+		}
+
+	} else if (!strcasecmp(wcmd, "check-listener")) {
+		char *id = switch_event_get_header(stream->param_event, "listen-id");
+		uint32_t idl = (uint32_t) atol(id);
+		void *pop;
+		switch_event_t *pevent;
+		
+		if (!(listener = find_listener(idl))) {
+			stream->write_function(stream, "<data><reply type=\"error\">Can't find listener</reply></data>\n");
+			goto end;
+		}
+
+		listener->last_flush = switch_timestamp(NULL);
+		stream->write_function(stream, "<data>\n <reply type=\"success\">Current Events Follow</reply>\n");			
+		xmlize_listener(listener, stream);
+		stream->write_function(stream, "<events>\n");			
+
+		while (switch_queue_trypop(listener->event_queue, &pop) == SWITCH_STATUS_SUCCESS) {
+			pevent = (switch_event_t *) pop;
+			char *etype;
+
+			if (listener->format == EVENT_FORMAT_PLAIN) {
+				etype = "plain";
+				switch_event_serialize(pevent, &listener->ebuf, SWITCH_TRUE);
+				stream->write_function(stream, "<event type=\"plain\">\n%s</event>", listener->ebuf);
+			} else {
+				switch_xml_t xml;
+				etype = "xml";
+
+				if ((xml = switch_event_xmlize(pevent, "%s", ""))) {
+					listener->ebuf = switch_xml_toxml(xml, SWITCH_FALSE);
+					switch_xml_free(xml);
+				} else {
+					stream->write_function(stream, "-ERR XML Error\n");
+					break;
+				}
+
+				stream->write_function(stream, "%s\n", listener->ebuf);
+			}
+			
+			switch_safe_free(listener->ebuf);
+			switch_event_destroy(&pevent);
+		}
+
+		stream->write_function(stream, " </events>\n</data>\n");
+
+		if (pevent) {
+			switch_event_destroy(&pevent);
+		}
+
+		switch_thread_rwlock_unlock(listener->rwlock);
+	} else if (!strcasecmp(wcmd, "exec-fsapi")) {
+		char *api_command = switch_event_get_header(stream->param_event, "fsapi-command");
+		char *api_args = switch_event_get_header(stream->param_event, "fsapi-args");
+		switch_event_t *event, *oevent;
+
+		if (!(api_command)) {
+			stream->write_function(stream, "<data><reply type=\"error\">INVALID API COMMAND!</reply></data>\n");
+			goto end;
+		}
+
+		stream->write_function(stream, "<data>\n <reply type=\"success\">Execute API Command</reply>\n<api-command>\n");
+		switch_event_create(&event, SWITCH_EVENT_REQUEST_PARAMS);
+		oevent = stream->param_event;
+		stream->param_event = event;
+		switch_api_execute(api_command, api_args, NULL, stream);
+		stream->param_event = oevent;
+		stream->write_function(stream, " </api-command>\n</data>");
+	} else {
+		stream->write_function(stream, "<data><reply type=\"error\">INVALID COMMAND!</reply></data\n");
+	}
+
+ end:
+
+	stream->write_function(stream, "</root>\n\n");
+	
+	return SWITCH_STATUS_SUCCESS;
+}
+
+
+SWITCH_MODULE_LOAD_FUNCTION(mod_event_socket_load)
+{
+	switch_application_interface_t *app_interface;
+	switch_api_interface_t *api_interface;
+
+	/* connect my internal structure to the blank pointer passed to me */
+	*module_interface = switch_loadable_module_create_module_interface(pool, modname);
+	SWITCH_ADD_APP(app_interface, "socket", "Connect to a socket", "Connect to a socket", socket_function, "<ip>[:<port>]", SAF_SUPPORT_NOMEDIA);
+	SWITCH_ADD_API(api_interface, "event_manager", "event_manager", event_manager_function, "<web data>");
+
+	/* indicate that the module should continue to be loaded */
+	return SWITCH_STATUS_SUCCESS;
+}
+
 static switch_status_t read_packet(listener_t *listener, switch_event_t **event, uint32_t timeout)
 {
 	switch_size_t mlen, bytes = 0;



More information about the Freeswitch-svn mailing list