[Freeswitch-svn] [commit] r12846 - freeswitch/trunk/src

FreeSWITCH SVN anthm at freeswitch.org
Mon Mar 30 14:12:06 PDT 2009


Author: anthm
Date: Mon Mar 30 16:12:06 2009
New Revision: 12846

Log:
change blocking rtp to psuedo-blocking to avoid endlessly blocking reads and refactor jitter buffer

Modified:
   freeswitch/trunk/src/switch_apr.c
   freeswitch/trunk/src/switch_rtp.c

Modified: freeswitch/trunk/src/switch_apr.c
==============================================================================
--- freeswitch/trunk/src/switch_apr.c	(original)
+++ freeswitch/trunk/src/switch_apr.c	Mon Mar 30 16:12:06 2009
@@ -791,7 +791,13 @@
 
 SWITCH_DECLARE(switch_status_t) switch_poll(switch_pollfd_t *aprset, int32_t numsock, int32_t *nsds, switch_interval_time_t timeout)
 {
-	return apr_poll((apr_pollfd_t *)aprset, numsock, nsds, timeout);
+	apr_status_t st = apr_poll((apr_pollfd_t *)aprset, numsock, nsds, timeout);
+
+	if (st == APR_TIMEUP) {
+		st = SWITCH_STATUS_TIMEOUT;
+	}
+
+	return st;
 }
 
 SWITCH_DECLARE(switch_status_t) switch_socket_create_pollfd(switch_pollfd_t **poll, switch_socket_t *sock, int16_t flags, switch_memory_pool_t *pool)

Modified: freeswitch/trunk/src/switch_rtp.c
==============================================================================
--- freeswitch/trunk/src/switch_rtp.c	(original)
+++ freeswitch/trunk/src/switch_rtp.c	Mon Mar 30 16:12:06 2009
@@ -129,7 +129,9 @@
 	 * used.
 	 */
 	switch_socket_t *sock_input, *sock_output;
-
+	switch_pollfd_t *read_pollfd;
+	switch_pollfd_t *jb_pollfd;
+	
 	switch_sockaddr_t *local_addr;
 	rtp_msg_t send_msg;
 
@@ -188,7 +190,6 @@
 	switch_timer_t timer;
 	uint8_t ready;
 	uint8_t cn;
-	switch_time_t last_time;
 	stfu_instance_t *jb;
 	uint32_t max_missed_packets;
 	uint32_t missed_count;
@@ -598,6 +599,8 @@
 	if (switch_test_flag(rtp_session, SWITCH_RTP_FLAG_USE_TIMER) || switch_test_flag(rtp_session, SWITCH_RTP_FLAG_NOBLOCK)) {
 		switch_socket_opt_set(rtp_session->sock_input, SWITCH_SO_NONBLOCK, TRUE);
 		switch_set_flag_locked(rtp_session, SWITCH_RTP_FLAG_NOBLOCK);
+	} else {
+		switch_socket_create_pollfd(&rtp_session->read_pollfd, rtp_session->sock_input, SWITCH_POLLIN | SWITCH_POLLERR, rtp_session->pool);
 	}
 
 	status = SWITCH_STATUS_SUCCESS;
@@ -1013,7 +1016,14 @@
 
 SWITCH_DECLARE(switch_status_t) switch_rtp_activate_jitter_buffer(switch_rtp_t *rtp_session, uint32_t queue_frames)
 {
+	if (rtp_session->read_pollfd) {
+		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Can't use jitterbuffer without a timer.\n");
+		return SWITCH_STATUS_FALSE;
+	}
+
 	rtp_session->jb = stfu_n_init(queue_frames);
+	switch_socket_create_pollfd(&rtp_session->jb_pollfd, rtp_session->sock_input, SWITCH_POLLIN | SWITCH_POLLERR, rtp_session->pool);
+
 	return SWITCH_STATUS_SUCCESS;
 }
 
@@ -1364,11 +1374,15 @@
 static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_type, switch_frame_flag_t *flags, switch_io_flag_t io_flags)
 {
 	switch_size_t bytes = 0;
-	switch_status_t status;
+	switch_status_t status = SWITCH_STATUS_SUCCESS, poll_status = SWITCH_STATUS_SUCCESS;
 	int check = 0;
 	stfu_frame_t *jb_frame;
 	int ret = -1;
 	int sleep_mss = 1000;
+	int poll_sec = 5;
+	int poll_loop = 0;
+	int fdr = 0;
+	int from_jb = 0;
 
 	if (!switch_rtp_ready(rtp_session)) {
 		return -1;
@@ -1376,8 +1390,6 @@
 
 	if (rtp_session->timer.interval) {
 		sleep_mss = rtp_session->timer.interval * 1000;
-	} else {
-		rtp_session->last_time = switch_time_now();
 	}
 
 	READ_INC(rtp_session);
@@ -1386,13 +1398,54 @@
 		int do_cng = 0;
 
 		if (rtp_session->timer.interval) {
-			switch_core_timer_next(&rtp_session->timer);
+			int do_sleep = 1;
+			if (rtp_session->jb) {
+				if (switch_poll(rtp_session->jb_pollfd, 1, &fdr, 1) == SWITCH_STATUS_SUCCESS) {
+					do_sleep = 0;
+				}
+			}
+			if (do_sleep) {
+				switch_core_timer_next(&rtp_session->timer);
+			}
 		}
 
 	recvfrom:
 
-		bytes = sizeof(rtp_msg_t);
-		status = switch_socket_recvfrom(rtp_session->from_addr, rtp_session->sock_input, 0, (void *) &rtp_session->recv_msg, &bytes);
+		if (rtp_session->jb) {
+			if ((jb_frame = stfu_n_read_a_frame(rtp_session->jb))) {
+				memcpy(rtp_session->recv_msg.body, jb_frame->data, jb_frame->dlen);
+				if (jb_frame->plc) {
+					*flags |= SFF_PLC;
+				}
+				bytes = jb_frame->dlen + rtp_header_len;
+				rtp_session->recv_msg.header.ts = htonl(jb_frame->ts);
+				rtp_session->recv_msg.header.pt = rtp_session->payload;
+				from_jb = 1;
+				goto post_read;
+			}
+		}
+
+		
+		if (rtp_session->read_pollfd) {
+			poll_status = switch_poll(rtp_session->read_pollfd, 1, &fdr, poll_sec * 1000000);
+		} 
+
+
+		if (poll_status == SWITCH_STATUS_SUCCESS) {
+			bytes = sizeof(rtp_msg_t);
+			status = switch_socket_recvfrom(rtp_session->from_addr, rtp_session->sock_input, 0, (void *) &rtp_session->recv_msg, &bytes);
+		} else {
+			if (!SWITCH_STATUS_IS_BREAK(poll_status) && poll_status != SWITCH_STATUS_TIMEOUT) {
+				ret = -1;
+				goto end;
+			}
+
+			poll_loop = 1;
+			rtp_session->missed_count += (poll_sec * 1000 ) / (rtp_session->ms_per_packet / 1000);
+			bytes = 0;
+		}
+
+	post_read:
 
 		if (bytes < 0) {
 			ret = (int) bytes;
@@ -1427,6 +1480,10 @@
 			continue;
 		}
 
+		if (!bytes && poll_loop) {
+			goto recvfrom;
+		}
+		
 		if (bytes && rtp_session->recv_msg.header.m && rtp_session->recv_msg.header.pt != rtp_session->te) {
 			rtp_flush_read_buffer(rtp_session);
 		}
@@ -1517,26 +1574,16 @@
 			goto end;
 		}
 
-		if (rtp_session->jb && ((bytes && rtp_session->recv_msg.header.pt == rtp_session->payload) || check)) {
+		if (rtp_session->jb && !from_jb && ((bytes && rtp_session->recv_msg.header.pt == rtp_session->payload) || check)) {
 			if (bytes) {
+
 				if (rtp_session->recv_msg.header.m) {
 					stfu_n_reset(rtp_session->jb);
 				}
 
-				stfu_n_eat(rtp_session->jb, ntohl(rtp_session->recv_msg.header.ts), rtp_session->recv_msg.body, bytes - rtp_header_len);
+				stfu_n_eat(rtp_session->jb, ntohl(rtp_session->recv_msg.header.ts), rtp_session->recv_msg.body, bytes - rtp_header_len);				
 				bytes = 0;
-			}
-
-			if ((jb_frame = stfu_n_read_a_frame(rtp_session->jb))) {
-				memcpy(rtp_session->recv_msg.body, jb_frame->data, jb_frame->dlen);
-				if (jb_frame->plc) {
-					*flags |= SFF_PLC;
-				}
-				bytes = jb_frame->dlen + rtp_header_len;
-				rtp_session->recv_msg.header.ts = htonl(jb_frame->ts);
-				rtp_session->recv_msg.header.pt = rtp_session->payload;
-			} else {
-				goto timer_check;
+				goto recvfrom;
 			}
 		}
 



More information about the Freeswitch-svn mailing list