[Freeswitch-svn] [commit] r10132 - freeswitch/trunk/src/mod/event_handlers/mod_event_socket
Freeswitch SVN
anthm at freeswitch.org
Thu Oct 23 13:31:23 EDT 2008
Author: anthm
Date: Thu Oct 23 13:31:22 2008
New Revision: 10132
Modified:
freeswitch/trunk/src/mod/event_handlers/mod_event_socket/mod_event_socket.c
Log:
add nifty new goodies
Modified: freeswitch/trunk/src/mod/event_handlers/mod_event_socket/mod_event_socket.c
==============================================================================
--- freeswitch/trunk/src/mod/event_handlers/mod_event_socket/mod_event_socket.c (original)
+++ freeswitch/trunk/src/mod/event_handlers/mod_event_socket/mod_event_socket.c Thu Oct 23 13:31:22 2008
@@ -47,7 +47,8 @@
LFLAG_FULL = (1 << 4),
LFLAG_MYEVENTS = (1 << 5),
LFLAG_SESSION = (1 << 6),
- LFLAG_ASYNC = (1 << 7)
+ LFLAG_ASYNC = (1 << 7),
+ LFLAG_STATEFUL = (1 << 8)
} event_flag_t;
typedef enum {
@@ -72,6 +73,9 @@
int lost_events;
int lost_logs;
int hup;
+ time_t last_flush;
+ uint32_t timeout;
+ uint32_t id;
switch_sockaddr_t *sa;
char remote_ip[50];
switch_port_t remote_port;
@@ -99,8 +103,21 @@
int threads;
char *acl[MAX_ACL];
uint32_t acl_count;
+ uint32_t id;
} prefs;
+
+static void remove_listener(listener_t *listener);
+
+static uint32_t next_id(void)
+{
+ uint32_t id;
+ switch_mutex_lock(listen_list.mutex);
+ id = ++prefs.id;
+ switch_mutex_unlock(listen_list.mutex);
+ return id;
+}
+
SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_pref_ip, prefs.ip);
SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_pref_pass, prefs.password);
@@ -165,6 +182,14 @@
continue;
}
+ if (switch_test_flag(l, LFLAG_STATEFUL) && l->timeout && switch_timestamp(NULL) - l->last_flush > l->timeout) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Stateful Listener %u has expired\n", l->id);
+ remove_listener(l);
+ switch_thread_rwlock_unlock(l->rwlock);
+ switch_core_hash_destroy(&l->event_hash);
+ switch_core_destroy_memory_pool(&l->pool);
+ }
+
if (l->event_list[SWITCH_EVENT_ALL]) {
send = 1;
} else if ((l->event_list[event->event_id])) {
@@ -347,18 +372,6 @@
return SWITCH_STATUS_SUCCESS;
}
-SWITCH_MODULE_LOAD_FUNCTION(mod_event_socket_load)
-{
- switch_application_interface_t *app_interface;
-
- /* connect my internal structure to the blank pointer passed to me */
- *module_interface = switch_loadable_module_create_module_interface(pool, modname);
- SWITCH_ADD_APP(app_interface, "socket", "Connect to a socket", "Connect to a socket", socket_function, "<ip>[:<port>]", SAF_SUPPORT_NOMEDIA);
-
- /* indicate that the module should continue to be loaded */
- return SWITCH_STATUS_SUCCESS;
-}
-
static void add_listener(listener_t *listener)
{
/* add me to the listeners so I get events */
@@ -386,6 +399,24 @@
switch_mutex_unlock(listen_list.mutex);
}
+
+static listener_t *find_listener(uint32_t id)
+{
+ listener_t *l, *r = NULL;
+
+ switch_mutex_lock(listen_list.mutex);
+ for (l = listen_list.listeners; l; l = l->next) {
+ if (l->id && l->id == id) {
+ if (switch_thread_rwlock_tryrdlock(l->rwlock) == SWITCH_STATUS_SUCCESS) {
+ r = l;
+ }
+ break;
+ }
+ }
+ switch_mutex_unlock(listen_list.mutex);
+ return r;
+}
+
static void strip_cr(char *s)
{
char *p;
@@ -394,6 +425,242 @@
}
}
+
+static void xmlize_listener(listener_t *listener, switch_stream_handle_t *stream)
+{
+ stream->write_function(stream, " <listener>\n");
+ stream->write_function(stream, " <listen-id>%u</listen-id>\n", listener->id);
+ stream->write_function(stream, " <format>%s</format>\n", listener->format == EVENT_FORMAT_XML ? "xml" : "plain");
+ stream->write_function(stream, " <timeout>%u</timeout>\n", listener->timeout);
+ stream->write_function(stream, " </listener>\n");
+}
+
+SWITCH_STANDARD_API(event_manager_function)
+{
+ char *http = NULL;
+ char *wcmd = NULL;
+ char *format = NULL;
+ listener_t *listener = NULL;
+
+ if (stream->param_event) {
+ http = switch_event_get_header(stream->param_event, "http-host");
+ wcmd = switch_event_get_header(stream->param_event, "command");
+ format = switch_event_get_header(stream->param_event, "format");
+ }
+
+ if (!http) {
+ stream->write_function(stream, "This is a web application.!\n");
+ return SWITCH_STATUS_SUCCESS;
+ }
+ stream->write_function(stream, "Content-Type: text/xml\n\n");
+
+ stream->write_function(stream, "<?xml version=\"1.0\"?>\n");
+ stream->write_function(stream, "<root>\n");
+
+ if (!wcmd) {
+ stream->write_function(stream, "<data><reply type=\"error\">Missing command parameter!</reply></data>\n");
+ goto end;
+ }
+
+ if (!format) {
+ format = "xml";
+ }
+
+
+
+ if (!strcasecmp(wcmd, "create-listener")) {
+ char *events = switch_event_get_header(stream->param_event, "events");
+ switch_memory_pool_t *pool;
+ char *next, *cur;
+ uint32_t count = 0, key_count = 0;
+ uint8_t custom = 0;
+ char *edup;
+
+ if (switch_strlen_zero(events)) {
+ stream->write_function(stream, "<data><reply type=\"error\">Missing parameter!</reply></data>\n");
+ goto end;
+ }
+
+ switch_core_new_memory_pool(&pool);
+ listener = switch_core_alloc(pool, sizeof(*listener));
+ listener->pool = pool;
+ listener->format = EVENT_FORMAT_PLAIN;
+ switch_mutex_init(&listener->flag_mutex, SWITCH_MUTEX_NESTED, listener->pool);
+ switch_core_hash_init(&listener->event_hash, listener->pool);
+ switch_set_flag(listener, LFLAG_AUTHED);
+ switch_set_flag(listener, LFLAG_STATEFUL);
+ switch_queue_create(&listener->event_queue, SWITCH_CORE_QUEUE_LEN, listener->pool);
+ switch_thread_rwlock_create(&listener->rwlock, listener->pool);
+ listener->id = next_id();
+ listener->timeout = 60;
+ listener->last_flush = switch_timestamp(NULL);
+
+ if (switch_stristr("xml", format)) {
+ listener->format = EVENT_FORMAT_XML;
+ } else {
+ listener->format = EVENT_FORMAT_PLAIN;
+ }
+
+ edup = strdup(events);
+
+ for (cur = edup; cur; count++) {
+ switch_event_types_t type;
+
+ if ((next = strchr(cur, ' '))) {
+ *next++ = '\0';
+ }
+
+ if (custom) {
+ switch_core_hash_insert(listener->event_hash, cur, MARKER);
+ } else if (switch_name_event(cur, &type) == SWITCH_STATUS_SUCCESS) {
+ key_count++;
+ if (type == SWITCH_EVENT_ALL) {
+ uint32_t x = 0;
+ for (x = 0; x < SWITCH_EVENT_ALL; x++) {
+ listener->event_list[x] = 1;
+ }
+ }
+ if (type <= SWITCH_EVENT_ALL) {
+ listener->event_list[type] = 1;
+ }
+ if (type == SWITCH_EVENT_CUSTOM) {
+ custom++;
+ }
+ }
+
+ cur = next;
+ }
+
+
+ switch_safe_free(edup);
+
+ if (!key_count) {
+ switch_core_hash_destroy(&listener->event_hash);
+ switch_core_destroy_memory_pool(&listener->pool);
+ stream->write_function(stream, "<data><reply type=\"error\">No keywords supplied</reply></data>\n");
+ goto end;
+ }
+
+
+ switch_set_flag_locked(listener, LFLAG_EVENTS);
+ add_listener(listener);
+ stream->write_function(stream, "<data>\n");
+ stream->write_function(stream, " <reply type=\"success\">Listener %u Created</reply>\n", listener->id);
+ xmlize_listener(listener, stream);
+ stream->write_function(stream, "</data>\n");
+
+ goto end;
+ } else if (!strcasecmp(wcmd, "destroy-listener")) {
+ char *id = switch_event_get_header(stream->param_event, "listen-id");
+ uint32_t idl = (uint32_t) atol(id);
+
+ if ((listener = find_listener(idl))) {
+ remove_listener(listener);
+ stream->write_function(stream, "<data>\n <reply type=\"success\">listener %u destroyed</reply>\n", listener->id);
+ xmlize_listener(listener, stream);
+ stream->write_function(stream, "</data>\n");
+ switch_thread_rwlock_unlock(listener->rwlock);
+ switch_core_hash_destroy(&listener->event_hash);
+ switch_core_destroy_memory_pool(&listener->pool);
+ goto end;
+ } else {
+ stream->write_function(stream, "<data><reply type=\"error\">Can't find listener</reply></data>\n");
+ goto end;
+ }
+
+ } else if (!strcasecmp(wcmd, "check-listener")) {
+ char *id = switch_event_get_header(stream->param_event, "listen-id");
+ uint32_t idl = (uint32_t) atol(id);
+ void *pop;
+ switch_event_t *pevent;
+
+ if (!(listener = find_listener(idl))) {
+ stream->write_function(stream, "<data><reply type=\"error\">Can't find listener</reply></data>\n");
+ goto end;
+ }
+
+ listener->last_flush = switch_timestamp(NULL);
+ stream->write_function(stream, "<data>\n <reply type=\"success\">Current Events Follow</reply>\n");
+ xmlize_listener(listener, stream);
+ stream->write_function(stream, "<events>\n");
+
+ while (switch_queue_trypop(listener->event_queue, &pop) == SWITCH_STATUS_SUCCESS) {
+ pevent = (switch_event_t *) pop;
+ char *etype;
+
+ if (listener->format == EVENT_FORMAT_PLAIN) {
+ etype = "plain";
+ switch_event_serialize(pevent, &listener->ebuf, SWITCH_TRUE);
+ stream->write_function(stream, "<event type=\"plain\">\n%s</event>", listener->ebuf);
+ } else {
+ switch_xml_t xml;
+ etype = "xml";
+
+ if ((xml = switch_event_xmlize(pevent, "%s", ""))) {
+ listener->ebuf = switch_xml_toxml(xml, SWITCH_FALSE);
+ switch_xml_free(xml);
+ } else {
+ stream->write_function(stream, "-ERR XML Error\n");
+ break;
+ }
+
+ stream->write_function(stream, "%s\n", listener->ebuf);
+ }
+
+ switch_safe_free(listener->ebuf);
+ switch_event_destroy(&pevent);
+ }
+
+ stream->write_function(stream, " </events>\n</data>\n");
+
+ if (pevent) {
+ switch_event_destroy(&pevent);
+ }
+
+ switch_thread_rwlock_unlock(listener->rwlock);
+ } else if (!strcasecmp(wcmd, "exec-fsapi")) {
+ char *api_command = switch_event_get_header(stream->param_event, "fsapi-command");
+ char *api_args = switch_event_get_header(stream->param_event, "fsapi-args");
+ switch_event_t *event, *oevent;
+
+ if (!(api_command)) {
+ stream->write_function(stream, "<data><reply type=\"error\">INVALID API COMMAND!</reply></data>\n");
+ goto end;
+ }
+
+ stream->write_function(stream, "<data>\n <reply type=\"success\">Execute API Command</reply>\n<api-command>\n");
+ switch_event_create(&event, SWITCH_EVENT_REQUEST_PARAMS);
+ oevent = stream->param_event;
+ stream->param_event = event;
+ switch_api_execute(api_command, api_args, NULL, stream);
+ stream->param_event = oevent;
+ stream->write_function(stream, " </api-command>\n</data>");
+ } else {
+ stream->write_function(stream, "<data><reply type=\"error\">INVALID COMMAND!</reply></data\n");
+ }
+
+ end:
+
+ stream->write_function(stream, "</root>\n\n");
+
+ return SWITCH_STATUS_SUCCESS;
+}
+
+
+SWITCH_MODULE_LOAD_FUNCTION(mod_event_socket_load)
+{
+ switch_application_interface_t *app_interface;
+ switch_api_interface_t *api_interface;
+
+ /* connect my internal structure to the blank pointer passed to me */
+ *module_interface = switch_loadable_module_create_module_interface(pool, modname);
+ SWITCH_ADD_APP(app_interface, "socket", "Connect to a socket", "Connect to a socket", socket_function, "<ip>[:<port>]", SAF_SUPPORT_NOMEDIA);
+ SWITCH_ADD_API(api_interface, "event_manager", "event_manager", event_manager_function, "<web data>");
+
+ /* indicate that the module should continue to be loaded */
+ return SWITCH_STATUS_SUCCESS;
+}
+
static switch_status_t read_packet(listener_t *listener, switch_event_t **event, uint32_t timeout)
{
switch_size_t mlen, bytes = 0;
More information about the Freeswitch-svn
mailing list