[Freeswitch-users] Asynchronous communication with FreeSWITCH's mod_event_socket

Richard Open Source oss.richard at gmail.com
Fri Sep 19 07:00:22 PDT 2008


oops...sorry for the mess...


import org.apache.mina.common.IoSession
import java.util.concurrent.BlockingQueue
import java.util.concurrent.Executors
import java.util.concurrent.ExecutorService
import java.util.concurrent.Future
import java.util.concurrent.Callable
import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeoutException
import java.util.concurrent.TimeUnit



class Session {
       private IoSession session
        private FSEventHandler handler
        private BlockingQueue<String> msgQ
        private final ExecutorService executor =
Executors.newSingleThreadExecutor()
       private final static long DEFAULT_TIMEOUT = 5000

        def data

        def Session(IoSession s, BlockingQueue q) {
                session = s
                msgQ = q
        }

        private def executeAndWait(Closure task, long timeout=0) {
        Future <CommandResult> f = executor.submit(task as Callable)
        def result
        def boolean success = false
                try {
                        if (timeout != 0) {
                                result = f.get(timeout, TimeUnit.MILLISECONDS)
                        } else {
                                result = f.get()
                        }
                        if (result.code == CommandResult.OK) data = result.data
                } catch (ExecutionException e) {
                        // Should log here
                } catch (TimeoutException e) {
                        f.cancel(true)
                }

               return result
        }

        def answer() {
                def task = {
                        def done = false
                        def r = new CommandResult()
                        sendMessage("answer")
                       while (! done) {
                               def m = msgQ.take()
                                if ((m?.event?.Name ==
"CHANNEL_EXECUTE_COMPLETE") && (m?.Application == "answer")) {
                                        done = true
                                        r.code = CommandResult.OK
                                        r.data = m
                                }
                        }
                        return r
                }
        executeAndWait(task, DEFAULT_TIMEOUT)
        }

        def unset(var) {
                def task = {
                                def done = false
                                def r = new CommandResult()
                                sendMessage("unset", var)
                                while (! done) {
                                        def m = msgQ.take()
                                        if ((m?.event?.Name ==
"CHANNEL_EXECUTE_COMPLETE")
                                                        &&
(m?.Application == "unset")
                                                        &&
(m?.ApplicationData == var)) {
                                                done = true
                                                r.code = CommandResult.OK
                                                r.data = m
                                        }
                                }
                                return r
                        }
                executeAndWait(task, DEFAULT_TIMEOUT)
        }

        def queueDtmf(dtmfs) {
                def task = {
                                def done = false
                                def r = new CommandResult()
                                sendMessage("queue_dtmf", dtmfs)
                                while (! done) {
                                        def m = msgQ.take()
                                        if ((m?.event?.Name ==
"CHANNEL_EXECUTE_COMPLETE")
                                                        &&
(m?.Application == "queue_dtmf")
                                                        &&
(m?.ApplicationData == dtmfs)) {
                                                done = true
                                                r.code = CommandResult.OK
                                                r.data = m
                                        }
                                }
                                return r
                        }
                executeAndWait(task, DEFAULT_TIMEOUT)
        }

/*
        def hangup() {
                def task = {
                                def done = false
                                def r = new CommandResult()
                                sendMessage("hangup")
                                while (! done) {
                                        def m = msgQ.take()
                                        if ((m?.event?.Name ==
"CHANNEL_EXECUTE_COMPLETE")
                                                       &&
(m?.Application == "queue_dtmf")
                                                        &&
(m?.ApplicationData == dtmfs)) {
                                                done = true
                                                r.code = CommandResult.OK
                                                r.data = m
                                        }
                               }
                                return r
                       }
                executeAndWait(task, DEFAULT_TIMEOUT)
        }
*/

       def setVariable(String var, String value) {
                def task = {
                                def done = false
                                def r = new CommandResult()
                                sendMessage("set", "${var}=${value}")
                                while (! done) {
                                        def m = msgQ.take()
                                        if ((m?.event?.Name ==
"CHANNEL_EXECUTE_COMPLETE")
                                                        &&
(m?.Application == "set")
                                                        &&
(m?.ApplicationData == "${var}=${value}")) {
                                               done = true
                                                r.code = CommandResult.OK
                                                r.data = m
                                        }
                                }
                               return r
                        }
                executeAndWait(task, DEFAULT_TIMEOUT)

        }

        def export(String var) {
                set("export_vars", var)
        }

        def bridge(String number) {
                def task = {
                                def done = false
                                def r = new CommandResult()
                                sendMessage("bridge", number)
                                while (! done) {
                                        def m = msgQ.take()
//                                      println m
                                        if (((m?.event?.Name ==
"CHANNEL_EXECUTE_COMPLETE")
                                                        &&
(m?.Application == "bridge")
                                                        &&
(m?.ApplicationData == number)) ||

((m?.event?.Name == "CHANNEL_UNBRIDGE")
                                                        &&
(m?.variable.bridge_channel == number)) || (m?.SESSIONCLOSED ==
"true")) {
                                                done = true
                                               r.code = CommandResult.OK
                                                r.data = m
                                        }
                                }
                                return r
                        }
                executeAndWait(task)
        }

        def promptForDigits(int min, int max, String soundFile, String
variableName, long timeout, String terminator) {
                def task = {
                                def done = false
                                def r = new CommandResult()
                                def appData = "${min} ${max}
${soundFile} ${variableName} ${timeout} ${terminator}"
                                sendMessage("read", appData, true)
                                while (! done) {
                                        def m = msgQ.take()
                                       if ((m?.event?.Name ==
"CHANNEL_EXECUTE_COMPLETE")
                                                        &&
(m?.Application == "read")
                                                        &&
(m?.ApplicationData == appData)) {
                                                done = true
                                                r.code =
CommandResult.OK                                                r.data
= m
                                        }
                                }
                                return r
                        }
                executeAndWait(task, timeout)
        }

        def originate(String url) {
                sendMessage("originate", url, true)
        }

        def sleep(int sec) {
                sendMessage("sleep", new Integer(sec*1000).toString())
        }

        def say(String phrase) {
                sendMessage("phrase", "spell,$phrase")
       }

        def script() {
                return "jeprox"
       }

        private def sendMessage(String app, String arg=null, boolean
event_lock=false) {
               String msg = "sendmsg\ncall-command:
execute\nexecute-app-name: ${app}"
                if (arg) {
                        msg += "\nexecute-app-arg: ${arg}"
                }
                if (event_lock) {
                        msg += "\nevent-lock: ${event_lock}"
                }
                System.out.println(msg)
                session.write("$msg\n\n")
        }
}

On Fri, Sep 19, 2008 at 9:35 AM, Richard Open Source
<oss.richard at gmail.com> wrote:
>
> I agree.
>
> I have started working on a library like asterisk-java (now onhold but hopefully can continue working on it in a couple of weeks) using groovy (will name it fs-groovy :))
>
> The way I check if the command was successful is to look for the Event with CHANNEL_EXECUTE_COMPLETE and Application and ApplicationData.
>
> Here is how I implemented it. Still needs more work though.
> --
> On Thu, Sep 18, 2008 at 5:49 PM, Luke Graybill <killarny at gmail.com> wrote:
>>
>> I've been doing a lot of work recently with FreeSWITCH's mod_event_socket, and I wanted to comment a bit about the syntax used for commands through the socket while using asynchronous mode. I haven't tried the synchronous mode yet, as I always want to be free to be able to execute commands without waiting for other commands to finish. For instance, I need to be able to collect DTMF events while I'm playing a sound file, so that the user can do things like select menu items without listening to the entire menu first.
>>
>> Asterisk's AMI protocol allows you to specify an ActionID along with every command that you send. Asterisk then includes this ActionID with every event that is related to that command, making it cake to coordinate an asynchronous client.
>>
>> However, even in async mode, FreeSWITCH's mod_event_socket doesn't communicate any identifying information for command responses, or for events triggered by a previous command, unless one uses the bgapi command set. This command set is not applicable to every situation, though. It only applies to commands which manipulate the call; if one needs to manipulate the channel, then messages must be used through the sendmsg command set, which doesn't provide any specific identifying information.
>>
>> Now, to complicate things, with all commands (even bgapi) the protocol works something like this: you send a command, and wait for a response from mod_event_socket. This response is assumed to be immediate before anything else the client might receive from mod_event_socket, and in the case of bgapi, this response will contain a job-id to use for comparing job-related events later.
>>
>> For example, for the following command:
>>
>> sendmsg
>> call-command: execute
>> execute-app-name: answer\n\n
>>
>> The response is this:
>>
>> Content-Type: command/reply
>> Reply-Text: +OK
>>
>> That response is generic to nearly every single command sent, and is only really saying "The last transmission was a valid command, and didn't immediately fail". The command may actually fail later, and command specific feedback is generally contained in later events (which have no unique identifying information).
>>
>> My issue here is that this seems to be forcing an asynchronous client to rely upon a synchronous ordering of response directly following command, thus violating the very concepts of an asynchronous protocol in which there should be no assumed order. Not only that, but this method increases the complexity of a client, which must be aware of limitations that wouldn't ordinarily be required by a true asynchronous protocol. An asynchronous client should be unconcerned with listening for a synchronous response to every command.
>>
>> My suggested solution is to apply the job-id concept from bgapi to messages as well, and to go a step further; borrow the Asterisk idea of transmitting an identifier along with each command. Every response and event related to that command should then contain the very same identifier in the header.
>> _______________________________________________
>> Freeswitch-users mailing list
>> Freeswitch-users at lists.freeswitch.org
>> http://lists.freeswitch.org/mailman/listinfo/freeswitch-users
>> UNSUBSCRIBE:http://lists.freeswitch.org/mailman/options/freeswitch-users
>> http://www.freeswitch.org
>>
>




More information about the FreeSWITCH-users mailing list