[Freeswitch-svn] [commit] r7285 - freeswitch/trunk/src

Freeswitch SVN anthm at freeswitch.org
Fri Jan 18 13:14:53 EST 2008


Author: anthm
Date: Fri Jan 18 13:14:53 2008
New Revision: 7285

Modified:
   freeswitch/trunk/src/switch_rtp.c

Log:
tighten up rtp

Modified: freeswitch/trunk/src/switch_rtp.c
==============================================================================
--- freeswitch/trunk/src/switch_rtp.c	(original)
+++ freeswitch/trunk/src/switch_rtp.c	Fri Jan 18 13:14:53 2008
@@ -44,6 +44,10 @@
 #include <datatypes.h>
 #include <srtp.h>
 
+#define READ_INC(rtp_session) switch_mutex_lock(rtp_session->read_mutex); rtp_session->reading++
+#define READ_DEC(rtp_session)  switch_mutex_unlock(rtp_session->read_mutex); rtp_session->reading--
+#define WRITE_INC(rtp_session)  switch_mutex_lock(rtp_session->write_mutex); rtp_session->writing++
+#define WRITE_DEC(rtp_session) switch_mutex_unlock(rtp_session->write_mutex); rtp_session->writing--
 
 #include "stfu.h"
 
@@ -167,6 +171,8 @@
 	switch_payload_t te;
 	switch_payload_t cng_pt;
 	switch_mutex_t *flag_mutex;
+	switch_mutex_t *read_mutex;
+	switch_mutex_t *write_mutex;
 	switch_timer_t timer;
 	uint8_t ready;
 	uint8_t cn;
@@ -176,6 +182,8 @@
 	uint32_t missed_count;
 	rtp_msg_t write_msg;
 	switch_rtp_crypto_key_t *crypto_keys[SWITCH_RTP_CRYPTO_MAX];
+	int reading;
+	int writing;
 };
 
 static int global_init = 0;
@@ -194,13 +202,16 @@
 	switch_stun_packet_t *packet;
 	unsigned int elapsed;
 	switch_size_t bytes;
+	switch_status_t status = SWITCH_STATUS_SUCCESS;
 
+	WRITE_INC(rtp_session);
+	
 	switch_assert(rtp_session != NULL);
 	switch_assert(rtp_session->ice_user != NULL);
 
 	if (rtp_session->stuncount != 0) {
 		rtp_session->stuncount--;
-		return SWITCH_STATUS_SUCCESS;
+		goto end;
 	}
 
 	if (rtp_session->last_stun) {
@@ -208,7 +219,8 @@
 
 		if (elapsed > 30000) {
 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "No stun for a long time (PUNT!)\n");
-			return SWITCH_STATUS_FALSE;
+			status = SWITCH_STATUS_FALSE;
+			goto end;
 		}
 	}
 
@@ -218,6 +230,10 @@
 	switch_socket_sendto(rtp_session->sock, rtp_session->remote_addr, 0, (void *) packet, &bytes);
 	rtp_session->stuncount = 25;
 
+ end:
+	WRITE_DEC(rtp_session);
+	
+
 	return SWITCH_STATUS_SUCCESS;
 }
 
@@ -228,6 +244,10 @@
 	char username[33] = { 0 };
 	unsigned char buf[512] = { 0 };
 	switch_size_t cpylen = len;
+
+
+	READ_INC(rtp_session);
+	WRITE_INC(rtp_session);
 	
 	if (cpylen > 512) {
 		cpylen = 512;
@@ -271,6 +291,9 @@
 		bytes = switch_stun_packet_length(rpacket);
 		switch_socket_sendto(rtp_session->sock, rtp_session->from_addr, 0, (void *) rpacket, &bytes);
 	}
+
+	READ_DEC(rtp_session);
+	WRITE_DEC(rtp_session);
 }
 
 
@@ -383,6 +406,9 @@
 	int x;
 #endif
 
+	WRITE_INC(rtp_session);
+	READ_INC(rtp_session);
+
 	*err = NULL;
 
 	if (switch_sockaddr_info_get(&rtp_session->local_addr, host, SWITCH_UNSPEC, port, 0, rtp_session->pool) != SWITCH_STATUS_SUCCESS) {
@@ -460,6 +486,9 @@
 	if (old_sock) {
 		switch_socket_close(old_sock);
 	}
+	
+	WRITE_DEC(rtp_session);
+	READ_DEC(rtp_session);
 
 	return status;
 }
@@ -609,8 +638,11 @@
 
 	rtp_session->pool = pool;
 	rtp_session->te = 101;
+	rtp_session->ready = 1;
 
 	switch_mutex_init(&rtp_session->flag_mutex, SWITCH_MUTEX_NESTED, pool);
+	switch_mutex_init(&rtp_session->read_mutex, SWITCH_MUTEX_NESTED, pool);
+	switch_mutex_init(&rtp_session->write_mutex, SWITCH_MUTEX_NESTED, pool);
 	switch_mutex_init(&rtp_session->dtmf_data.dtmf_mutex, SWITCH_MUTEX_NESTED, pool);
 	switch_queue_create(&rtp_session->dtmf_data.dtmf_queue, 100, rtp_session->pool);
 	switch_queue_create(&rtp_session->dtmf_data.dtmf_inqueue, 100, rtp_session->pool);
@@ -703,7 +735,7 @@
  end:
 
 	if (rtp_session) {
-		rtp_session->ready = 1;
+		rtp_session->ready = 2;
 		rtp_session->rx_host = switch_core_strdup(rtp_session->pool, rx_host);
 		rtp_session->rx_port = rx_port;
 	} else {
@@ -755,27 +787,42 @@
 	switch_assert(rtp_session != NULL);
 	switch_mutex_lock(rtp_session->flag_mutex);
 	if (switch_test_flag(rtp_session, SWITCH_RTP_FLAG_IO)) {
+		switch_clear_flag(rtp_session, SWITCH_RTP_FLAG_IO);
 		switch_assert(rtp_session->sock != NULL);
 		switch_socket_shutdown(rtp_session->sock, SWITCH_SHUTDOWN_READWRITE);
-		switch_clear_flag(rtp_session, SWITCH_RTP_FLAG_IO);
 	}
 	switch_mutex_unlock(rtp_session->flag_mutex);
 }
 
 SWITCH_DECLARE(uint8_t) switch_rtp_ready(switch_rtp_t *rtp_session)
 {
-	return (rtp_session != NULL && rtp_session->sock && rtp_session->ready) ? 1 : 0;
+	return (rtp_session != NULL && switch_test_flag(rtp_session, SWITCH_RTP_FLAG_IO) && rtp_session->sock && rtp_session->ready == 2) ? 1 : 0;
 }
 
 SWITCH_DECLARE(void) switch_rtp_destroy(switch_rtp_t **rtp_session)
 {
 	void *pop;
 	switch_socket_t *sock;
+	int sanity = 0;
 
-	if (!switch_rtp_ready(*rtp_session)) {
+	switch_mutex_lock((*rtp_session)->flag_mutex);
+
+	if (!rtp_session || !*rtp_session || !(*rtp_session)->ready) {
 		return;
 	}
 
+	(*rtp_session)->ready = 0;
+
+
+	while((*rtp_session)->reading || (*rtp_session)->writing) {
+		switch_yield(10000);
+		if (++sanity > 1000) {
+			break;
+		}
+	}
+
+	switch_rtp_kill_socket(*rtp_session);
+
 	while(switch_queue_trypop((*rtp_session)->dtmf_data.dtmf_inqueue, &pop) == SWITCH_STATUS_SUCCESS) {
 		free(pop);
 	}
@@ -783,22 +830,16 @@
 	while(switch_queue_trypop((*rtp_session)->dtmf_data.dtmf_queue, &pop) == SWITCH_STATUS_SUCCESS) {
 		free(pop);
 	}
-	
-	(*rtp_session)->ready = 0;
-
-	switch_mutex_lock((*rtp_session)->flag_mutex);
 
 	if ((*rtp_session)->jb) {
 		stfu_n_destroy(&(*rtp_session)->jb);
 	}
 
-	switch_rtp_kill_socket(*rtp_session);
 	sock = (*rtp_session)->sock;
 	(*rtp_session)->sock = NULL;
 	switch_socket_close(sock);
 
 
-
 	if (switch_test_flag((*rtp_session), SWITCH_RTP_FLAG_VAD)) {
 		switch_rtp_disable_vad(*rtp_session);
 	}
@@ -824,8 +865,8 @@
 	}
 
 	switch_rtp_release_port((*rtp_session)->rx_host, (*rtp_session)->rx_port);
-
 	switch_mutex_unlock((*rtp_session)->flag_mutex);
+
 	return;
 }
 
@@ -993,11 +1034,14 @@
 	switch_status_t status;
 	uint8_t check = 1;
 	stfu_frame_t *jb_frame;
+	int ret = -1;
 	
 	if (!rtp_session->timer.interval) {
 		rtp_session->last_time = switch_time_now();
 	}
 
+	READ_INC(rtp_session);
+
 	while (switch_rtp_ready(rtp_session)) {
 		bytes = sizeof(rtp_msg_t);
 		status = switch_socket_recvfrom(rtp_session->from_addr, rtp_session->sock, 0, (void *) &rtp_session->recv_msg, &bytes);
@@ -1006,10 +1050,6 @@
 			switch_core_timer_step(&rtp_session->timer);
 		}
 
-		if (!switch_test_flag(rtp_session, SWITCH_RTP_FLAG_IO)) {
-			return -1;
-		}
-
 		if (bytes && switch_test_flag(rtp_session, SWITCH_RTP_FLAG_SECURE_RECV)) {
 			int sbytes = (int) bytes;
 			err_status_t stat = 0;
@@ -1020,7 +1060,8 @@
 				rtp_session->recv_ctx = NULL;
 				if ((stat = srtp_create(&rtp_session->recv_ctx, &rtp_session->recv_policy))) {
 					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error! RE-Activating Secure RTP RECV\n");
-					return -1;
+					ret = -1;
+					goto end;
 				} else {
 					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "RE-Activating Secure RTP RECV\n");
 					rtp_session->srtp_errs = 0;
@@ -1035,7 +1076,8 @@
 					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR,
 									  "error: srtp unprotection failed with code %d%s\n", stat,
 									  stat == err_status_replay_fail ? " (replay check failed)" : stat == err_status_auth_fail ? " (auth check failed)" : "");
-					return -1;
+					ret = -1;
+					goto end;
 				} else {
 					sbytes = 0;
 				}
@@ -1075,11 +1117,13 @@
 			*flags |= SFF_CNG;
 			/* Return a CNG frame */
 			*payload_type = SWITCH_RTP_CNG_PAYLOAD;
-			return 2 + rtp_header_len;
+			ret = 2 + rtp_header_len;
+			goto end;
 		}
 
 		if (bytes < 0) {
-			return (int) bytes;
+			ret = (int) bytes;
+			goto end;
 		}
 
 		if (rtp_session->timer.interval) {
@@ -1101,7 +1145,8 @@
 			do_2833(rtp_session);
 			if (!bytes && rtp_session->max_missed_packets) {
 				if (++rtp_session->missed_count >= rtp_session->max_missed_packets) {
-					return -2;
+					ret = -2;
+					goto end;
 				}
 			}
 			
@@ -1126,7 +1171,8 @@
 				rtp_session->recv_msg.header.pt = (uint32_t) rtp_session->cng_pt ? rtp_session->cng_pt : SWITCH_RTP_CNG_PAYLOAD;
 				*flags |= SFF_CNG;
 				*payload_type = (switch_payload_t)rtp_session->recv_msg.header.pt;
-				return 2 + rtp_header_len;
+				ret = 2 + rtp_header_len;
+				goto end;
 			}
 		}
 
@@ -1146,7 +1192,8 @@
 			rtp_session->recv_msg.header.pt = (uint32_t) rtp_session->cng_pt ? rtp_session->cng_pt : SWITCH_RTP_CNG_PAYLOAD;
 			*flags |= SFF_CNG;
 			*payload_type = (switch_payload_t)rtp_session->recv_msg.header.pt;
-			return 2 + rtp_header_len;
+			ret = 2 + rtp_header_len;
+			goto end;
 		}
 
 		if (bytes > 0) {
@@ -1157,7 +1204,8 @@
 			if (switch_test_flag(rtp_session, SWITCH_RTP_FLAG_DATAWAIT)) {
 				goto do_continue;
 			}
-			return 0;
+			ret = 0;
+			goto end;
 		}
 		
 		if (bytes && switch_test_flag(rtp_session, SWITCH_RTP_FLAG_AUTOADJ) && switch_sockaddr_get_port(rtp_session->from_addr)) {
@@ -1255,7 +1303,12 @@
 
 	do_continue:
 		
-		switch_yield(1000);
+		if (rtp_session->ms_per_packet) {
+            switch_yield((rtp_session->ms_per_packet / 1000) * 750);
+        } else {
+            switch_yield(1000);
+        }
+
 	}
 
 	*payload_type = (switch_payload_t) rtp_session->recv_msg.header.pt;
@@ -1268,7 +1321,13 @@
 		do_2833(rtp_session);
 	}
 
-	return (int) bytes;
+	ret = (int) bytes;
+
+ end:
+
+	READ_DEC(rtp_session);
+
+	return ret;
 }
 
 SWITCH_DECLARE(switch_size_t) switch_rtp_has_dtmf(switch_rtp_t *rtp_session)
@@ -1453,11 +1512,14 @@
 	switch_size_t bytes;
 	uint8_t send = 1;
 	uint32_t this_ts = 0;
+	int ret;
 
 	if (!switch_rtp_ready(rtp_session)) {
 		return SWITCH_STATUS_FALSE;
 	}
 
+	WRITE_INC(rtp_session);
+
 	if (send_msg) {
 		bytes = datalen;
 		if (flags && *flags & SFF_RFC2833) {
@@ -1600,7 +1662,8 @@
 				} 
 			}
 		} else {
-			return SWITCH_STATUS_GENERR;
+			ret = -1;
+			goto end;
 		}
 	}
 
@@ -1625,7 +1688,8 @@
 				rtp_session->send_ctx = NULL;
 				if ((stat = srtp_create(&rtp_session->send_ctx, &rtp_session->send_policy))) {
 					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error! RE-Activating Secure RTP SEND\n");
-					return -1;
+					ret = -1;
+					goto end;
 				} else {
 					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "RE-Activating Secure RTP SEND\n");
 				}
@@ -1642,7 +1706,8 @@
 
 		if (switch_socket_sendto(rtp_session->sock, rtp_session->remote_addr, 0, (void *) send_msg, &bytes) != SWITCH_STATUS_SUCCESS) {
 			rtp_session->seq--;
-			return -1;
+			ret = -1;
+			goto end;
 		}
 		
 		if (rtp_session->timer.interval) {
@@ -1654,11 +1719,19 @@
 
 	if (rtp_session->ice_user) {
 		if (ice_out(rtp_session) != SWITCH_STATUS_SUCCESS) {
-			return -1;
+			ret = -1;
+			goto end;
 		}
 	}
 
-	return (int) bytes;
+	ret = (int) bytes;
+
+ end:
+
+	WRITE_DEC(rtp_session);
+
+	return ret;
+
 }
 
 
@@ -1729,16 +1802,12 @@
 	switch_payload_t payload;
 	rtp_msg_t *send_msg = NULL;
 
-	if (!switch_rtp_ready(rtp_session)) {
+	if (!switch_rtp_ready(rtp_session) || !rtp_session->remote_addr) {
 		return -1;
 	}
 
 	fwd = (switch_test_flag(rtp_session, SWITCH_RTP_FLAG_RAW_WRITE) && switch_test_flag(frame, SFF_RAW_RTP)) ? 1 : 0;
 
-	if (!switch_test_flag(rtp_session, SWITCH_RTP_FLAG_IO) || !rtp_session->remote_addr) {
-		return -1;
-	}
-
 	switch_assert(frame != NULL);
 
 	if (switch_test_flag(frame, SFF_CNG)) {
@@ -1771,18 +1840,13 @@
 											uint8_t m, switch_payload_t payload, uint32_t ts, switch_frame_flag_t *flags)
 {
 	switch_size_t bytes;
+	int ret = -1;
 
-	if (!switch_rtp_ready(rtp_session)) {
+	if (!switch_rtp_ready(rtp_session) || !rtp_session->remote_addr || datalen > SWITCH_RTP_MAX_BUF_LEN) {
 		return -1;
 	}
 
-	if (!switch_test_flag(rtp_session, SWITCH_RTP_FLAG_IO) || !rtp_session->remote_addr) {
-		return -1;
-	}
-
-	if (datalen > SWITCH_RTP_MAX_BUF_LEN) {
-		return -1;
-	}
+	WRITE_INC(rtp_session);
 
 	rtp_session->write_msg = rtp_session->send_msg;
 	rtp_session->write_msg.header.seq = htons(++rtp_session->seq);
@@ -1803,7 +1867,8 @@
 			rtp_session->send_ctx = NULL;
 			if ((stat = srtp_create(&rtp_session->send_ctx, &rtp_session->send_policy))) {
 				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error! RE-Activating Secure RTP SEND\n");
-				return -1;
+				ret = -1;
+				goto end;
 			} else {
 				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "RE-Activating Secure RTP SEND\n");
 			}
@@ -1818,12 +1883,20 @@
 
 	if (switch_socket_sendto(rtp_session->sock, rtp_session->remote_addr, 0, (void *) &rtp_session->write_msg, &bytes) != SWITCH_STATUS_SUCCESS) {
 		rtp_session->seq--;
-		return -1;
+		ret = -1;
+		goto end;
 	}
 
 	rtp_session->last_write_ts = ts;
 
-	return (int) bytes;
+	ret = (int) bytes;
+	
+ end:
+
+	WRITE_DEC(rtp_session);
+
+	return ret;
+
 }
 
 SWITCH_DECLARE(uint32_t) switch_rtp_get_ssrc(switch_rtp_t *rtp_session)



More information about the Freeswitch-svn mailing list