Page MenuHomePhabricator

No OneTemporary

diff --git a/sipsimple/core/_core.mediatransport.pxi b/sipsimple/core/_core.mediatransport.pxi
index a08d6a96..8cc36cfc 100644
--- a/sipsimple/core/_core.mediatransport.pxi
+++ b/sipsimple/core/_core.mediatransport.pxi
@@ -1,1147 +1,1147 @@
# Copyright (C) 2008-2010 AG Projects. See LICENSE for details.
#
# python imports
from errno import EADDRINUSE
# classes
cdef class RTPTransport:
def __cinit__(self, *args, **kwargs):
cdef pj_pool_t *pool
cdef pjsip_endpoint *endpoint
cdef PJSIPUA ua
ua = _get_ua()
endpoint = ua._pjsip_endpoint._obj
pool_name = "RTPTransport_%d" % id(self)
self.weakref = weakref.ref(self)
Py_INCREF(self.weakref)
self._af = pj_AF_INET()
pj_mutex_create_recursive(ua._pjsip_endpoint._pool, "rtp_transport_lock", &self._lock)
with nogil:
pool = pjsip_endpt_create_pool(endpoint, pool_name, 4096, 4096)
if pool == NULL:
raise SIPCoreError("Could not allocate memory pool")
self._pool = pool
self.state = "NULL"
def __init__(self, local_rtp_address=None, use_srtp=False, srtp_forced=False, use_ice=False,
ice_stun_address=None, ice_stun_port=PJ_STUN_PORT):
cdef PJSIPUA ua = _get_ua()
if self.state != "NULL":
raise SIPCoreError("RTPTransport.__init__() was already called")
if local_rtp_address is not None and not _is_valid_ip(self._af, local_rtp_address):
raise ValueError("Not a valid IPv4 address: %s" % local_rtp_address)
if ice_stun_address is not None and not _is_valid_ip(self._af, ice_stun_address):
raise ValueError("Not a valid IPv4 address: %s" % ice_stun_address)
self._local_rtp_addr = local_rtp_address
self.use_srtp = use_srtp
self.srtp_forced = srtp_forced
self.use_ice = use_ice
self.ice_stun_address = ice_stun_address
self.ice_stun_port = ice_stun_port
def __dealloc__(self):
cdef PJSIPUA ua
cdef pjsip_endpoint *endpoint
cdef pjmedia_transport *transport
cdef pjmedia_transport *wrapped_transport
cdef pj_pool_t *pool
cdef Timer timer
try:
ua = _get_ua()
except SIPCoreError:
return
endpoint = ua._pjsip_endpoint._obj
pool = self._pool
transport = self._obj
wrapped_transport = self._wrapped_transport
if self.state in ["LOCAL", "ESTABLISHED"]:
with nogil:
pjmedia_transport_media_stop(transport)
if self._obj != NULL:
with nogil:
pjmedia_transport_close(transport)
if self._obj.type == PJMEDIA_TRANSPORT_TYPE_ICE:
(<void **> (self._obj.name + 1))[0] = NULL
if self._wrapped_transport != NULL:
if self._wrapped_transport.type == PJMEDIA_TRANSPORT_TYPE_ICE:
(<void **> (self._obj.name + 1))[0] = NULL
self._wrapped_transport = NULL
self._obj = NULL
if self._wrapped_transport != NULL:
if self._wrapped_transport.type == PJMEDIA_TRANSPORT_TYPE_ICE:
(<void **> (self._obj.name + 1))[0] = NULL
with nogil:
pjmedia_transport_close(wrapped_transport)
if self._pool != NULL:
with nogil:
pjsip_endpt_release_pool(endpoint, pool)
timer = Timer()
try:
timer.schedule(60, deallocate_weakref, self.weakref)
except SIPCoreError:
pass
cdef PJSIPUA _check_ua(self):
cdef PJSIPUA ua
try:
ua = _get_ua()
return ua
except:
self.state = "INVALID"
self._obj = NULL
self._wrapped_transport = NULL
self._pool = NULL
return None
cdef int _get_info(self, pjmedia_transport_info *info) except -1:
cdef int status
cdef pjmedia_transport *transport
transport = self._obj
with nogil:
pjmedia_transport_info_init(info)
status = pjmedia_transport_get_info(transport, info)
if status != 0:
raise PJSIPError("Could not get transport info", status)
return 0
property local_rtp_port:
def __get__(self):
cdef int status
cdef pj_mutex_t *lock = self._lock
cdef pjmedia_transport_info info
cdef PJSIPUA ua
ua = self._check_ua()
if ua is None:
return None
with nogil:
status = pj_mutex_lock(lock)
if status != 0:
raise PJSIPError("failed to acquire lock", status)
try:
if self.state in ["NULL", "WAIT_STUN", "INVALID"]:
return None
self._get_info(&info)
if info.sock_info.rtp_addr_name.addr.sa_family != 0:
return pj_sockaddr_get_port(&info.sock_info.rtp_addr_name)
else:
return None
finally:
with nogil:
pj_mutex_unlock(lock)
property local_rtp_address:
def __get__(self):
cdef char buf[PJ_INET6_ADDRSTRLEN]
cdef int status
cdef pj_mutex_t *lock = self._lock
cdef pjmedia_transport_info info
cdef PJSIPUA ua
ua = self._check_ua()
if ua is None:
return self._local_rtp_addr
with nogil:
status = pj_mutex_lock(lock)
if status != 0:
raise PJSIPError("failed to acquire lock", status)
try:
if self.state in ["NULL", "WAIT_STUN", "INVALID"]:
return self._local_rtp_addr
self._get_info(&info)
if pj_sockaddr_has_addr(&info.sock_info.rtp_addr_name):
return pj_sockaddr_print(&info.sock_info.rtp_addr_name, buf, PJ_INET6_ADDRSTRLEN, 0)
else:
return None
finally:
with nogil:
pj_mutex_unlock(lock)
property remote_rtp_port_received:
def __get__(self):
cdef int status
cdef pj_mutex_t *lock = self._lock
cdef pjmedia_transport_info info
cdef PJSIPUA ua
ua = self._check_ua()
if ua is None:
return None
with nogil:
status = pj_mutex_lock(lock)
if status != 0:
raise PJSIPError("failed to acquire lock", status)
try:
if self.state in ["NULL", "WAIT_STUN", "INVALID"]:
return None
if self._ice_active:
return self.remote_rtp_port_ice
self._get_info(&info)
if info.src_rtp_name.addr.sa_family != 0:
return pj_sockaddr_get_port(&info.src_rtp_name)
else:
return None
finally:
with nogil:
pj_mutex_unlock(lock)
property remote_rtp_address_received:
def __get__(self):
cdef char buf[PJ_INET6_ADDRSTRLEN]
cdef int status
cdef pj_mutex_t *lock = self._lock
cdef pjmedia_transport_info info
cdef PJSIPUA ua
ua = self._check_ua()
if ua is None:
return None
with nogil:
status = pj_mutex_lock(lock)
if status != 0:
raise PJSIPError("failed to acquire lock", status)
try:
if self.state in ["NULL", "WAIT_STUN", "INVALID"]:
return None
if self._ice_active:
return self.remote_rtp_address_ice
self._get_info(&info)
if pj_sockaddr_has_addr(&info.src_rtp_name):
return pj_sockaddr_print(&info.src_rtp_name, buf, PJ_INET6_ADDRSTRLEN, 0)
else:
return None
finally:
with nogil:
pj_mutex_unlock(lock)
property srtp_active:
def __get__(self):
cdef int i
cdef int status
cdef pj_mutex_t *lock = self._lock
cdef pjmedia_srtp_info *srtp_info
cdef pjmedia_transport_info info
cdef PJSIPUA ua
ua = self._check_ua()
if ua is None:
return False
with nogil:
status = pj_mutex_lock(lock)
if status != 0:
raise PJSIPError("failed to acquire lock", status)
try:
if self.state in ["NULL", "WAIT_STUN", "INVALID"]:
return False
self._get_info(&info)
for i from 0 <= i < info.specific_info_cnt:
if info.spc_info[i].type == PJMEDIA_TRANSPORT_TYPE_SRTP:
srtp_info = <pjmedia_srtp_info *> info.spc_info[i].buffer
return bool(srtp_info.active)
return False
finally:
with nogil:
pj_mutex_unlock(lock)
property ice_active:
def __get__(self):
return bool(self._ice_active)
property local_rtp_candidate_type:
def __get__(self):
return self._local_rtp_candidate_type if self._ice_active else None
property remote_rtp_candidate_type:
def __get__(self):
return self._remote_rtp_candidate_type if self._ice_active else None
cdef int _update_local_sdp(self, SDPSession local_sdp, int sdp_index, pjmedia_sdp_session *remote_sdp) except -1:
cdef int status
cdef pj_pool_t *pool
cdef pjmedia_sdp_session *pj_local_sdp
cdef pjmedia_transport *transport
pj_local_sdp = local_sdp.get_sdp_session()
pool = self._pool
transport = self._obj
if sdp_index < 0:
raise ValueError("sdp_index argument cannot be negative")
if sdp_index >= local_sdp.get_sdp_session().media_count:
raise ValueError("sdp_index argument out of range")
with nogil:
status = pjmedia_transport_media_create(transport, pool, 0, remote_sdp, sdp_index)
if status != 0:
raise PJSIPError("Could not create media transport", status)
with nogil:
status = pjmedia_transport_encode_sdp(transport, pool, pj_local_sdp, remote_sdp, sdp_index)
if status != 0:
raise PJSIPError("Could not update SDP for media transport", status)
local_sdp._update()
return 0
def set_LOCAL(self, SDPSession local_sdp, int sdp_index):
cdef int status
cdef pj_mutex_t *lock = self._lock
_get_ua()
with nogil:
status = pj_mutex_lock(lock)
if status != 0:
raise PJSIPError("failed to acquire lock", status)
try:
if local_sdp is None:
raise SIPCoreError("local_sdp argument cannot be None")
if self.state == "LOCAL":
return
if self.state != "INIT":
raise SIPCoreError('set_LOCAL can only be called in the "INIT" state, current state is "%s"' % self.state)
self._update_local_sdp(local_sdp, sdp_index, NULL)
self.state = "LOCAL"
finally:
with nogil:
pj_mutex_unlock(lock)
def set_ESTABLISHED(self, BaseSDPSession local_sdp, BaseSDPSession remote_sdp, int sdp_index):
cdef int status
cdef pj_mutex_t *lock = self._lock
cdef pjmedia_sdp_session *pj_local_sdp
cdef pjmedia_sdp_session *pj_remote_sdp
cdef pjmedia_transport *transport = self._obj
_get_ua()
with nogil:
status = pj_mutex_lock(lock)
if status != 0:
raise PJSIPError("failed to acquire lock", status)
try:
transport = self._obj
if None in [local_sdp, remote_sdp]:
raise SIPCoreError("SDP arguments cannot be None")
pj_local_sdp = local_sdp.get_sdp_session()
pj_remote_sdp = remote_sdp.get_sdp_session()
if self.state == "ESTABLISHED":
return
if self.state not in ["INIT", "LOCAL"]:
raise SIPCoreError('set_ESTABLISHED can only be called in the "INIT" and "LOCAL" states, ' +
'current state is "%s"' % self.state)
if self.state == "INIT":
if not isinstance(local_sdp, SDPSession):
raise TypeError('local_sdp argument should be of type SDPSession when going from the "INIT" to the "ESTABLISHED" state')
self._update_local_sdp(<SDPSession>local_sdp, sdp_index, pj_remote_sdp)
with nogil:
status = pjmedia_transport_media_start(transport, self._pool, pj_local_sdp, pj_remote_sdp, sdp_index)
if status != 0:
raise PJSIPError("Could not start media transport", status)
if remote_sdp.media[sdp_index].connection is None:
if remote_sdp.connection is not None:
self.remote_rtp_address_sdp = remote_sdp.connection.address
else:
self.remote_rtp_address_sdp = remote_sdp.media[sdp_index].connection.address
self.remote_rtp_port_sdp = remote_sdp.media[sdp_index].port
self.state = "ESTABLISHED"
finally:
with nogil:
pj_mutex_unlock(lock)
def set_INIT(self):
global _ice_cb
cdef int af
cdef int i
cdef int status
cdef int port
cdef pj_caching_pool *caching_pool
cdef pj_ice_strans_cfg ice_cfg
cdef pj_mutex_t *lock = self._lock
cdef pj_str_t local_ip
cdef pj_str_t *local_ip_address = &local_ip
cdef pjmedia_endpt *media_endpoint
cdef pjmedia_srtp_setting srtp_setting
cdef pjmedia_transport **transport_address
cdef pjmedia_transport *wrapped_transport
cdef pjsip_endpoint *sip_endpoint
cdef PJSIPUA ua
ua = _get_ua()
with nogil:
status = pj_mutex_lock(lock)
if status != 0:
raise PJSIPError("failed to acquire lock", status)
try:
af = self._af
caching_pool = &ua._caching_pool._obj
media_endpoint = ua._pjmedia_endpoint._obj
sip_endpoint = ua._pjsip_endpoint._obj
transport_address = &self._obj
if self.state == "INIT":
return
if self.state in ["LOCAL", "ESTABLISHED"]:
with nogil:
status = pjmedia_transport_media_stop(transport_address[0])
if status != 0:
raise PJSIPError("Could not stop media transport", status)
self.remote_rtp_address_sdp = None
self.remote_rtp_port_sdp = None
self.remote_rtp_address_ice = None
self.remote_rtp_port_ice = None
self.state = "INIT"
elif self.state == "NULL":
if self._local_rtp_addr is None:
local_ip_address = NULL
else:
_str_to_pj_str(self._local_rtp_addr, &local_ip)
if self.use_ice:
with nogil:
pj_ice_strans_cfg_default(&ice_cfg)
ice_cfg.af = self._af
with nogil:
pj_stun_config_init(&ice_cfg.stun_cfg, &caching_pool.factory, 0,
pjmedia_endpt_get_ioqueue(media_endpoint),
pjsip_endpt_get_timer_heap(sip_endpoint))
if self.ice_stun_address is not None:
_str_to_pj_str(self.ice_stun_address, &ice_cfg.stun.server)
ice_cfg.stun.port = self.ice_stun_port
# IIRC we can't choose the port for ICE
with nogil:
status = pj_sockaddr_init(ice_cfg.af, &ice_cfg.stun.cfg.bound_addr, local_ip_address, 0)
if status != 0:
raise PJSIPError("Could not init ICE bound address", status)
# The state callback won't be called for the 'ICE Candidates Gathering' state because transport is not
# yet initialized, so we fake it. -Saul
_add_event("RTPTransportICENegotiationStateDidChange", dict(obj=self, state="ICE Candidates Gathering"))
with nogil:
status = pjmedia_ice_create2(media_endpoint, NULL, 2, &ice_cfg, &_ice_cb, 0, transport_address)
if status != 0:
raise PJSIPError("Could not create ICE media transport", status)
(<void **> (self._obj.name + 1))[0] = <void *> self.weakref
else:
status = PJ_EBUG
for i in xrange(ua._rtp_port_index, ua._rtp_port_index + ua._rtp_port_usable_count, 2):
port = ua._rtp_port_start + i % ua._rtp_port_usable_count
with nogil:
status = pjmedia_transport_udp_create3(media_endpoint, af, NULL, local_ip_address,
port, 0, transport_address)
if status != PJ_ERRNO_START_SYS + EADDRINUSE:
ua._rtp_port_index = (i + 2) % ua._rtp_port_usable_count
break
if status != 0:
raise PJSIPError("Could not create UDP/RTP media transport", status)
if self.use_srtp:
wrapped_transport = self._wrapped_transport = self._obj
self._obj = NULL
with nogil:
pjmedia_srtp_setting_default(&srtp_setting)
if self.srtp_forced:
srtp_setting.use = PJMEDIA_SRTP_MANDATORY
with nogil:
status = pjmedia_transport_srtp_create(media_endpoint, wrapped_transport, &srtp_setting, transport_address)
if status != 0:
with nogil:
pjmedia_transport_close(wrapped_transport)
self._wrapped_transport = NULL
raise PJSIPError("Could not create SRTP media transport", status)
if not self.use_ice or self.ice_stun_address is None:
self.state = "INIT"
_add_event("RTPTransportDidInitialize", dict(obj=self))
else:
self.state = "WAIT_STUN"
else:
raise SIPCoreError('set_INIT can only be called in the "NULL", "LOCAL" and "ESTABLISHED" states, ' +
'current state is "%s"' % self.state)
finally:
with nogil:
pj_mutex_unlock(lock)
cdef class MediaCheckTimer(Timer):
def __init__(self, media_check_interval):
self.media_check_interval = media_check_interval
cdef class AudioTransport:
def __cinit__(self, *args, **kwargs):
cdef pj_pool_t *pool
cdef pjsip_endpoint *endpoint
cdef PJSIPUA ua
ua = _get_ua()
endpoint = ua._pjsip_endpoint._obj
pool_name = "AudioTransport_%d" % id(self)
self.weakref = weakref.ref(self)
Py_INCREF(self.weakref)
pj_mutex_create_recursive(ua._pjsip_endpoint._pool, "audio_transport_lock", &self._lock)
with nogil:
pool = pjsip_endpt_create_pool(endpoint, pool_name, 4096, 4096)
if pool == NULL:
raise SIPCoreError("Could not allocate memory pool")
self._pool = pool
self._slot = -1
self._timer = None
self._volume = 100
def __init__(self, AudioMixer mixer, RTPTransport transport,
BaseSDPSession remote_sdp=None, int sdp_index=0, enable_silence_detection=False, list codecs=None):
cdef int status
cdef pj_pool_t *pool
cdef pjmedia_endpt *media_endpoint
cdef pjmedia_sdp_media *local_media
cdef pjmedia_sdp_session *local_sdp_c
cdef pjmedia_transport_info info
cdef list global_codecs
cdef SDPSession local_sdp
cdef PJSIPUA ua
ua = _get_ua()
media_endpoint = ua._pjmedia_endpoint._obj
pool = self._pool
if self.transport is not None:
raise SIPCoreError("AudioTransport.__init__() was already called")
if mixer is None:
raise ValueError("mixer argument may not be None")
if transport is None:
raise ValueError("transport argument cannot be None")
if sdp_index < 0:
raise ValueError("sdp_index argument cannot be negative")
if transport.state != "INIT":
raise SIPCoreError('RTPTransport object provided is not in the "INIT" state, but in the "%s" state' %
transport.state)
self._vad = int(bool(enable_silence_detection))
self.mixer = mixer
self.transport = transport
transport._get_info(&info)
global_codecs = ua._pjmedia_endpoint._get_current_codecs()
if codecs is None:
codecs = global_codecs
try:
ua._pjmedia_endpoint._set_codecs(codecs, self.mixer.sample_rate)
with nogil:
status = pjmedia_endpt_create_sdp(media_endpoint, pool, 1, &info.sock_info, &local_sdp_c)
if status != 0:
raise PJSIPError("Could not generate SDP for audio session", status)
finally:
ua._pjmedia_endpoint._set_codecs(global_codecs, 32000)
local_sdp = SDPSession_create(local_sdp_c)
if remote_sdp is None:
self._is_offer = 1
self.transport.set_LOCAL(local_sdp, 0)
else:
self._is_offer = 0
if sdp_index != 0:
local_sdp.media = (sdp_index+1) * local_sdp.media
self.transport.set_ESTABLISHED(local_sdp, remote_sdp, sdp_index)
local_sdp_c = local_sdp.get_sdp_session()
with nogil:
local_media = pjmedia_sdp_media_clone(pool, local_sdp_c.media[sdp_index])
self._local_media = local_media
def __dealloc__(self):
cdef PJSIPUA ua
cdef Timer timer
try:
ua = _get_ua()
except SIPCoreError:
return
cdef pjsip_endpoint *endpoint = ua._pjsip_endpoint._obj
cdef pj_pool_t *pool = self._pool
if self._obj != NULL:
self.stop()
if self._pool != NULL:
with nogil:
pjsip_endpt_release_pool(endpoint, pool)
timer = Timer()
try:
timer.schedule(60, deallocate_weakref, self.weakref)
except SIPCoreError:
pass
cdef PJSIPUA _check_ua(self):
cdef PJSIPUA ua
try:
ua = _get_ua()
return ua
except:
self._obj = NULL
self._pool = NULL
return None
property is_active:
def __get__(self):
self._check_ua()
return bool(self._obj != NULL)
property is_started:
def __get__(self):
return bool(self._is_started)
property codec:
def __get__(self):
self._check_ua()
if self._obj == NULL:
return None
else:
return _pj_str_to_str(self._stream_info.fmt.encoding_name)
property sample_rate:
def __get__(self):
self._check_ua()
if self._obj == NULL:
return None
else:
return self._stream_info.fmt.clock_rate
property enable_silence_detection:
def __get__(self):
return bool(self._vad)
property statistics:
def __get__(self):
cdef int status
cdef pj_mutex_t *lock = self._lock
cdef pjmedia_rtcp_stat stat
cdef pjmedia_stream *stream
cdef dict statistics = dict()
cdef PJSIPUA ua
ua = self._check_ua()
if ua is None:
return None
with nogil:
status = pj_mutex_lock(lock)
if status != 0:
raise PJSIPError("failed to acquire lock", status)
try:
stream = self._obj
if self._cached_statistics is not None:
return self._cached_statistics.copy()
if self._obj == NULL:
return None
with nogil:
status = pjmedia_stream_get_stat(stream, &stat)
if status != 0:
raise PJSIPError("Could not get RTP statistics", status)
statistics["rtt"] = _pj_math_stat_to_dict(&stat.rtt)
statistics["rx"] = _pjmedia_rtcp_stream_stat_to_dict(&stat.rx)
statistics["tx"] = _pjmedia_rtcp_stream_stat_to_dict(&stat.tx)
return statistics
finally:
with nogil:
pj_mutex_unlock(lock)
property volume:
def __get__(self):
return self._volume
def __set__(self, value):
cdef int slot
cdef int status
cdef int volume
cdef pj_mutex_t *lock = self._lock
cdef pjmedia_conf *conf_bridge
cdef PJSIPUA ua
ua = self._check_ua()
if ua is not None:
with nogil:
status = pj_mutex_lock(lock)
if status != 0:
raise PJSIPError("failed to acquire lock", status)
try:
conf_bridge = self.mixer._obj
slot = self._slot
if value < 0:
raise ValueError("volume attribute cannot be negative")
if ua is not None and self._obj != NULL:
volume = int(value * 1.28 - 128)
with nogil:
status = pjmedia_conf_adjust_rx_level(conf_bridge, slot, volume)
if status != 0:
raise PJSIPError("Could not set volume of audio transport", status)
self._volume = value
finally:
if ua is not None:
with nogil:
pj_mutex_unlock(lock)
property slot:
def __get__(self):
self._check_ua()
if self._slot == -1:
return None
else:
return self._slot
def get_local_media(self, is_offer, direction="sendrecv"):
cdef int status
cdef pj_mutex_t *lock = self._lock
cdef object direction_attr
cdef SDPAttribute attr
cdef SDPMediaStream local_media
_get_ua()
with nogil:
status = pj_mutex_lock(lock)
if status != 0:
raise PJSIPError("failed to acquire lock", status)
try:
if is_offer and direction not in ["sendrecv", "sendonly", "recvonly", "inactive"]:
raise SIPCoreError("Unknown direction: %s" % direction)
local_media = SDPMediaStream_create(self._local_media)
local_media.attributes = [<object> attr for attr in local_media.attributes if attr.name not in ["sendrecv",
"sendonly",
"recvonly",
"inactive"]]
if is_offer:
direction_attr = direction
else:
if self.direction is None or "recv" in self.direction:
direction_attr = "sendrecv"
else:
direction_attr = "sendonly"
local_media.attributes.append(SDPAttribute(direction_attr, ""))
for attribute in local_media.attributes:
if attribute.name == 'rtcp':
attribute.value = attribute.value.split(' ', 1)[0]
return local_media
finally:
with nogil:
pj_mutex_unlock(lock)
def start(self, BaseSDPSession local_sdp, BaseSDPSession remote_sdp, int sdp_index,
int no_media_timeout=10, int media_check_interval=30):
cdef int status
cdef object desired_state
cdef pj_mutex_t *lock = self._lock
cdef pj_pool_t *pool
cdef pjmedia_endpt *media_endpoint
cdef pjmedia_port *media_port
cdef pjmedia_sdp_media *local_media
cdef pjmedia_sdp_session *pj_local_sdp
cdef pjmedia_sdp_session *pj_remote_sdp
cdef pjmedia_stream **stream_address
cdef pjmedia_stream_info *stream_info_address
cdef pjmedia_transport *transport
cdef PJSIPUA ua
ua = _get_ua()
with nogil:
status = pj_mutex_lock(lock)
if status != 0:
raise PJSIPError("failed to acquire lock", status)
try:
pool = self._pool
media_endpoint = ua._pjmedia_endpoint._obj
stream_address = &self._obj
stream_info_address = &self._stream_info
transport = self.transport._obj
if self._is_started:
raise SIPCoreError("This AudioTransport was already started once")
desired_state = ("LOCAL" if self._is_offer else "ESTABLISHED")
if self.transport.state != desired_state:
raise SIPCoreError('RTPTransport object provided is not in the "%s" state, but in the "%s" state' %
(desired_state, self.transport.state))
if None in [local_sdp, remote_sdp]:
raise ValueError("SDP arguments cannot be None")
pj_local_sdp = local_sdp.get_sdp_session()
pj_remote_sdp = remote_sdp.get_sdp_session()
if sdp_index < 0:
raise ValueError("sdp_index argument cannot be negative")
if local_sdp.media[sdp_index].port == 0 or remote_sdp.media[sdp_index].port == 0:
raise SIPCoreError("Cannot start a rejected audio stream")
if no_media_timeout < 0:
raise ValueError("no_media_timeout value cannot be negative")
if media_check_interval < 0:
raise ValueError("media_check_interval value cannot be negative")
if self.transport.state == "LOCAL":
self.transport.set_ESTABLISHED(local_sdp, remote_sdp, sdp_index)
with nogil:
status = pjmedia_stream_info_from_sdp(stream_info_address, pool, media_endpoint,
pj_local_sdp, pj_remote_sdp, sdp_index)
if status != 0:
raise PJSIPError("Could not parse SDP for audio session", status)
if self._stream_info.param == NULL:
raise SIPCoreError("Could not parse SDP for audio session")
self._stream_info.param.setting.vad = self._vad
with nogil:
status = pjmedia_stream_create(media_endpoint, pool, stream_info_address,
transport, NULL, stream_address)
if status != 0:
raise PJSIPError("Could not initialize RTP for audio session", status)
with nogil:
status = pjmedia_stream_set_dtmf_callback(stream_address[0], _AudioTransport_cb_dtmf, <void *> self.weakref)
if status != 0:
with nogil:
pjmedia_stream_destroy(stream_address[0])
self._obj = NULL
raise PJSIPError("Could not set DTMF callback for audio session", status)
with nogil:
status = pjmedia_stream_start(stream_address[0])
if status != 0:
with nogil:
pjmedia_stream_destroy(stream_address[0])
self._obj = NULL
raise PJSIPError("Could not start RTP for audio session", status)
with nogil:
status = pjmedia_stream_get_port(stream_address[0], &media_port)
if status != 0:
with nogil:
pjmedia_stream_destroy(stream_address[0])
self._obj = NULL
raise PJSIPError("Could not get audio port for audio session", status)
try:
self._slot = self.mixer._add_port(ua, pool, media_port)
if self._volume != 100:
self.volume = self._volume
except:
with nogil:
pjmedia_stream_destroy(stream_address[0])
self._obj = NULL
raise
self.direction = "sendrecv"
self.update_direction(local_sdp.media[sdp_index].direction)
with nogil:
local_media = pjmedia_sdp_media_clone(pool, pj_local_sdp.media[sdp_index])
self._local_media = local_media
self._is_started = 1
if no_media_timeout > 0:
self._timer = MediaCheckTimer(media_check_interval)
self._timer.schedule(no_media_timeout, <timer_callback>self._cb_check_rtp, self)
finally:
with nogil:
pj_mutex_unlock(lock)
def stop(self):
cdef int status
cdef pj_mutex_t *lock = self._lock
cdef pjmedia_stream *stream
cdef PJSIPUA ua
ua = self._check_ua()
if ua is not None:
with nogil:
status = pj_mutex_lock(lock)
if status != 0:
raise PJSIPError("failed to acquire lock", status)
try:
stream = self._obj
if self._timer is not None:
self._timer.cancel()
self._timer = None
if self._obj == NULL:
return
self.mixer._remove_port(ua, self._slot)
self._cached_statistics = self.statistics
with nogil:
pjmedia_stream_destroy(stream)
self._obj = NULL
self.transport.set_INIT()
finally:
if ua is not None:
with nogil:
pj_mutex_unlock(lock)
def update_direction(self, direction):
cdef int status
cdef pj_mutex_t *lock = self._lock
cdef pjmedia_stream *stream
_get_ua()
with nogil:
status = pj_mutex_lock(lock)
if status != 0:
raise PJSIPError("failed to acquire lock", status)
try:
stream = self._obj
if self._obj == NULL:
raise SIPCoreError("Stream is not active")
if direction not in ["sendrecv", "sendonly", "recvonly", "inactive"]:
raise SIPCoreError("Unknown direction: %s" % direction)
if direction == self.direction:
return
self.direction = direction
finally:
with nogil:
pj_mutex_unlock(lock)
def send_dtmf(self, digit):
cdef int status
cdef pj_mutex_t *lock = self._lock
cdef pj_str_t digit_pj
cdef pjmedia_stream *stream
cdef PJSIPUA ua
ua = _get_ua()
with nogil:
status = pj_mutex_lock(lock)
if status != 0:
raise PJSIPError("failed to acquire lock", status)
try:
stream = self._obj
if self._obj == NULL:
raise SIPCoreError("Stream is not active")
if len(digit) != 1 or digit not in "0123456789*#ABCD":
raise SIPCoreError("Not a valid DTMF digit: %s" % digit)
_str_to_pj_str(digit, &digit_pj)
if not self._stream_info.tx_event_pt < 0:
# If the remote doesn't support telephone-event just don't send DTMF
with nogil:
status = pjmedia_stream_dial_dtmf(stream, &digit_pj)
if status != 0:
raise PJSIPError("Could not send DTMF digit on audio stream", status)
finally:
with nogil:
pj_mutex_unlock(lock)
cdef int _cb_check_rtp(self, MediaCheckTimer timer) except -1 with gil:
cdef int status
cdef pj_mutex_t *lock = self._lock
cdef pjmedia_rtcp_stat stat
cdef pjmedia_stream *stream
with nogil:
status = pj_mutex_lock(lock)
if status != 0:
raise PJSIPError("failed to acquire lock", status)
try:
stream = self._obj
if stream == NULL:
return 0
if self._timer is None:
return 0
self._timer = None
with nogil:
status = pjmedia_stream_get_stat(stream, &stat)
if status == 0:
if self._packets_received == stat.rx.pkt and self.direction == "sendrecv":
- _add_event("RTPAudioTransportDidNotGetRTP", dict(obj=self, got_any=(stat.rx.pkt != 0)))
+ _add_event("RTPAudioTransportDidTimeout", dict(obj=self))
self._packets_received = stat.rx.pkt
if timer.media_check_interval > 0:
self._timer = MediaCheckTimer(timer.media_check_interval)
self._timer.schedule(timer.media_check_interval, <timer_callback>self._cb_check_rtp, self)
finally:
with nogil:
pj_mutex_unlock(lock)
# helper functions
cdef dict _pj_math_stat_to_dict(pj_math_stat *stat):
cdef dict retval = dict()
retval["count"] = stat.n
retval["max"] = stat.max
retval["min"] = stat.min
retval["last"] = stat.last
retval["avg"] = stat.mean
return retval
cdef dict _pjmedia_rtcp_stream_stat_to_dict(pjmedia_rtcp_stream_stat *stream_stat):
cdef dict retval = dict()
retval["packets"] = stream_stat.pkt
retval["bytes"] = stream_stat.bytes
retval["packets_discarded"] = stream_stat.discard
retval["packets_lost"] = stream_stat.loss
retval["packets_reordered"] = stream_stat.reorder
retval["packets_duplicate"] = stream_stat.dup
retval["loss_period"] = _pj_math_stat_to_dict(&stream_stat.loss_period)
retval["burst_loss"] = bool(stream_stat.loss_type.burst)
retval["random_loss"] = bool(stream_stat.loss_type.random)
retval["jitter"] = _pj_math_stat_to_dict(&stream_stat.jitter)
return retval
# callback functions
cdef void _RTPTransport_cb_ice_complete(pjmedia_transport *tp, pj_ice_strans_op op, int status) with gil:
cdef void *rtp_transport_void = NULL
cdef RTPTransport rtp_transport
cdef PJSIPUA ua
try:
ua = _get_ua()
except:
return
try:
if tp != NULL:
rtp_transport_void = (<void **> (tp.name + 1))[0]
if rtp_transport_void != NULL:
rtp_transport = (<object> rtp_transport_void)()
if rtp_transport is None:
return
if op == PJ_ICE_STRANS_OP_NEGOTIATION:
if status == 0:
rtp_transport._ice_active = 1
else:
_add_event("RTPTransportICENegotiationDidFail", dict(obj=rtp_transport, reason=_pj_status_to_str(status)))
elif op == PJ_ICE_STRANS_OP_INIT:
if status == 0:
rtp_transport.state = "INIT"
else:
rtp_transport.state = "INVALID"
if status == 0:
_add_event("RTPTransportDidInitialize", dict(obj=rtp_transport))
else:
_add_event("RTPTransportDidFail", dict(obj=rtp_transport, reason=_pj_status_to_str(status)))
except:
ua._handle_exception(1)
cdef void _RTPTransport_cb_ice_candidates_chosen(pjmedia_transport *tp, int status, pj_ice_candidate_pair rtp_pair, pj_ice_candidate_pair rtcp_pair, char *duration, char *local_candidates, char *remote_candidates, char *valid_list) with gil:
cdef void *rtp_transport_void = NULL
cdef RTPTransport rtp_transport
cdef PJSIPUA ua
cdef dict chosen_local_candidates
cdef dict chosen_remote_candidates
cdef list local_cand_list = list()
cdef list remote_cand_list = list()
cdef list v_list = list()
try:
ua = _get_ua()
except:
return
try:
if tp != NULL:
rtp_transport_void = (<void **> (tp.name + 1))[0]
if rtp_transport_void != NULL:
rtp_transport = (<object> rtp_transport_void)()
if rtp_transport is None:
return
if status == 0:
for item in local_candidates.split("\r\n"):
if item:
item_id, component_id, address, comp_type = item.split(" ")
local_cand_list.append([item_id, component_id, address, comp_type])
for item in remote_candidates.split("\r\n"):
if item:
item_id, component_id, address, comp_type = item.split(" ")
remote_cand_list.append([item_id, component_id, address, comp_type])
for item in valid_list.split("\r\n"):
if item:
item_id, component_id, source, destination, nomination, state = item.split(" ")
v_list.append([item_id, component_id, source, destination, nomination, state])
chosen_local_candidates = dict(rtp_cand_type=rtp_pair.local_type, rtp_cand_ip=rtp_pair.local_ip, rtcp_cand_type=rtcp_pair.local_type, rtcp_cand_ip=rtcp_pair.local_ip)
chosen_remote_candidates = dict(rtp_cand_type=rtp_pair.remote_type, rtp_cand_ip=rtp_pair.remote_ip, rtcp_cand_type=rtcp_pair.remote_type, rtcp_cand_ip=rtcp_pair.remote_ip)
rtp_transport.remote_rtp_address_ice, rtp_transport.remote_rtp_port_ice = rtp_pair.remote_ip.split(":")
rtp_transport.remote_rtp_port_ice = int(rtp_transport.remote_rtp_port_ice)
rtp_transport._local_rtp_candidate_type = rtp_pair.local_type
rtp_transport._remote_rtp_candidate_type = rtp_pair.remote_type
_add_event("RTPTransportICENegotiationDidSucceed", dict(obj=rtp_transport, chosen_local_candidates=chosen_local_candidates, chosen_remote_candidates=chosen_remote_candidates, duration=duration, local_candidates=local_cand_list, remote_candidates=remote_cand_list, connectivity_checks_results=v_list))
except:
ua._handle_exception(1)
cdef void _RTPTransport_cb_ice_failure(pjmedia_transport *tp, char *reason) with gil:
cdef void *rtp_transport_void = NULL
cdef RTPTransport rtp_transport
cdef PJSIPUA ua
try:
ua = _get_ua()
except:
return
try:
if tp != NULL:
rtp_transport_void = (<void **> (tp.name + 1))[0]
if rtp_transport_void != NULL:
rtp_transport = (<object> rtp_transport_void)()
if rtp_transport is None:
return
_reason = reason
if _reason != "media stop requested":
rtp_transport._local_rtp_candidate_type = None
rtp_transport._remote_rtp_candidate_type = None
_add_event("RTPTransportICENegotiationDidFail", dict(obj=rtp_transport, reason=_reason))
except:
ua._handle_exception(1)
cdef void _RTPTransport_cb_ice_state(pjmedia_transport *tp, char *state) with gil:
cdef void *rtp_transport_void = NULL
cdef RTPTransport rtp_transport
cdef PJSIPUA ua
try:
ua = _get_ua()
except:
return
try:
if tp != NULL:
rtp_transport_void = (<void **> (tp.name + 1))[0]
if rtp_transport_void != NULL:
rtp_transport = (<object> rtp_transport_void)()
if rtp_transport is None:
return
_add_event("RTPTransportICENegotiationStateDidChange", dict(obj=rtp_transport, state=state))
except:
ua._handle_exception(1)
cdef void _AudioTransport_cb_dtmf(pjmedia_stream *stream, void *user_data, int digit) with gil:
cdef AudioTransport audio_stream = (<object> user_data)()
cdef PJSIPUA ua
try:
ua = _get_ua()
except:
return
if audio_stream is None:
return
try:
_add_event("RTPAudioStreamGotDTMF", dict(obj=audio_stream, digit=chr(digit)))
except:
ua._handle_exception(1)
# globals
cdef pjmedia_ice_cb _ice_cb
_ice_cb.on_ice_complete = _RTPTransport_cb_ice_complete
_ice_cb.on_ice_candidates_chosen = _RTPTransport_cb_ice_candidates_chosen
_ice_cb.on_ice_failure = _RTPTransport_cb_ice_failure
_ice_cb.on_ice_state = _RTPTransport_cb_ice_state
diff --git a/sipsimple/streams/rtp.py b/sipsimple/streams/rtp.py
index 05f79342..ee584dce 100644
--- a/sipsimple/streams/rtp.py
+++ b/sipsimple/streams/rtp.py
@@ -1,534 +1,537 @@
# Copyright (C) 2009-2010 AG Projects. See LICENSE for details.
#
"""
Handling of RTP media streams according to RFC3550, RFC3605, RFC3581,
RFC2833 and RFC3711, RFC3489 and draft-ietf-mmusic-ice-19.
"""
from __future__ import with_statement
__all__ = ['AudioStream']
from threading import RLock
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python.util import Null
from zope.interface import implements
from sipsimple.account import BonjourAccount
from sipsimple.audio import AudioBridge, AudioDevice, IAudioPort, WaveRecorder
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import AudioTransport, PJSIPError, RTPTransport, SIPCoreError, SIPURI
from sipsimple.lookup import DNSLookup
from sipsimple.streams import IMediaStream, InvalidStreamError, MediaStreamRegistrar, UnknownStreamError
from sipsimple.util import TimestampedNotificationData
class AudioStream(object):
__metaclass__ = MediaStreamRegistrar
implements(IMediaStream, IAudioPort, IObserver)
_streams = []
type = 'audio'
priority = 1
hold_supported = True
def __init__(self, account):
from sipsimple.application import SIPApplication
self.account = account
self.mixer = SIPApplication.voice_audio_mixer
self.bridge = AudioBridge(self.mixer)
self.device = AudioDevice(self.mixer)
self.notification_center = NotificationCenter()
self.on_hold_by_local = False
self.on_hold_by_remote = False
self.state = "NULL"
self._audio_rec = None
self._audio_transport = None
self._hold_request = None
self._ice_state = "NULL"
self._lock = RLock()
self._rtp_transport = None
self._session = None
self._try_ice = False
self._try_forced_srtp = False
self._use_srtp = False
self.bridge.add(self.device)
# Audio properties
#
@property
def codec(self):
return self._audio_transport.codec if self._audio_transport else None
@property
def consumer_slot(self):
return self._audio_transport.slot if self._audio_transport else None
@property
def producer_slot(self):
return self._audio_transport.slot if self._audio_transport and not self.muted else None
@property
def sample_rate(self):
return self._audio_transport.sample_rate if self._audio_transport else None
@property
def statistics(self):
return self._audio_transport.statistics if self._audio_transport else None
def _get_muted(self):
return self.__dict__.get('muted', False)
def _set_muted(self, value):
if not isinstance(value, bool):
raise ValueError("illegal value for muted property: %r" % (value,))
if value == self.muted:
return
old_producer_slot = self.producer_slot
self.__dict__['muted'] = value
notification_center = NotificationCenter()
notification_center.post_notification('AudioPortDidChangeSlots', sender=self, data=TimestampedNotificationData(consumer_slot_changed=False, producer_slot_changed=True,
old_producer_slot=old_producer_slot, new_producer_slot=self.producer_slot))
muted = property(_get_muted, _set_muted)
del _get_muted, _set_muted
# RTP properties
#
@property
def local_rtp_address(self):
return self._rtp_transport.local_rtp_address if self._rtp_transport else None
@property
def local_rtp_port(self):
return self._rtp_transport.local_rtp_port if self._rtp_transport else None
@property
def remote_rtp_address(self):
if self._ice_state == "IN_USE":
return self._rtp_transport.remote_rtp_address_received if self._rtp_transport else None
else:
return self._rtp_transport.remote_rtp_address_sdp if self._rtp_transport else None
@property
def remote_rtp_port(self):
if self._ice_state == "IN_USE":
return self._rtp_transport.remote_rtp_port_received if self._rtp_transport else None
else:
return self._rtp_transport.remote_rtp_port_sdp if self._rtp_transport else None
@property
def local_rtp_candidate_type(self):
return self._rtp_transport.local_rtp_candidate_type if self._rtp_transport else None
@property
def remote_rtp_candidate_type(self):
return self._rtp_transport.remote_rtp_candidate_type if self._rtp_transport else None
@property
def srtp_active(self):
return self._rtp_transport.srtp_active if self._rtp_transport else False
@property
def ice_active(self):
return self._ice_state == "IN_USE"
# Generic properties
#
@property
def on_hold(self):
return self.on_hold_by_local or self.on_hold_by_remote
@property
def recording_active(self):
return bool(self._audio_rec and self._audio_rec.is_active)
@property
def recording_filename(self):
recording = self._audio_rec
return recording.filename if recording else None
# Public methods
#
@classmethod
def new_from_sdp(cls, account, remote_sdp, stream_index):
# TODO: actually validate the SDP
settings = SIPSimpleSettings()
remote_stream = remote_sdp.media[stream_index]
if remote_stream.media != 'audio':
raise UnknownStreamError
if remote_stream.transport not in ('RTP/AVP', 'RTP/SAVP'):
raise InvalidStreamError("expected RTP/AVP or RTP/SAVP transport in audio stream, got %s" % remote_stream.transport)
supported_codecs = account.rtp.audio_codec_list or settings.rtp.audio_codec_list
common_codecs = [codec for codec in remote_stream.codec_list if codec in supported_codecs]
if not common_codecs:
raise InvalidStreamError("no compatible codecs found")
stream = cls(account)
stream._incoming_remote_sdp = remote_sdp
stream._incoming_stream_index = stream_index
stream._incoming_stream_has_srtp = remote_stream.has_srtp
stream._incoming_stream_has_srtp_forced = remote_stream.transport == 'RTP/SAVP'
if not stream._incoming_stream_has_srtp and account.rtp.srtp_encryption == "mandatory":
raise InvalidStreamError("SRTP is locally mandatory but it's not remotely enabled")
return stream
def initialize(self, session, direction):
with self._lock:
self._streams.append(self)
if self.state != "NULL":
raise RuntimeError("AudioStream.initialize() may only be called in the NULL state")
self.state = "INITIALIZING"
self._session = session
if hasattr(self, "_incoming_remote_sdp"):
# ICE attributes could come at the session level or at the media level
remote_stream = self._incoming_remote_sdp.media[self._incoming_stream_index]
self._try_ice = self.account.nat_traversal.use_ice and ((remote_stream.has_ice_attributes or self._incoming_remote_sdp.has_ice_attributes) and remote_stream.has_ice_candidates)
self._use_srtp = self._incoming_stream_has_srtp and ((self._session.transport == "tls" or self.account.rtp.use_srtp_without_tls) and self.account.rtp.srtp_encryption != "disabled")
self._try_forced_srtp = self._incoming_stream_has_srtp_forced
if self._incoming_stream_has_srtp_forced and not self._use_srtp:
self.state = "ENDED"
self.notification_center.post_notification("MediaStreamDidFail", self,
TimestampedNotificationData(reason="SRTP is remotely mandatory but it's not locally enabled"))
return
del self._incoming_stream_has_srtp
del self._incoming_stream_has_srtp_forced
else:
self._try_ice = self.account.nat_traversal.use_ice
self._use_srtp = ((self._session.transport == "tls" or self.account.rtp.use_srtp_without_tls) and self.account.rtp.srtp_encryption != "disabled")
self._try_forced_srtp = self.account.rtp.srtp_encryption == "mandatory"
if self._try_ice:
if self.account.nat_traversal.stun_server_list:
# Assume these are IP addresses
stun_servers = list((server.host, server.port) for server in self.account.nat_traversal.stun_server_list)
self._init_rtp_transport(stun_servers)
elif not isinstance(self.account, BonjourAccount):
dns_lookup = DNSLookup()
self.notification_center.add_observer(self, sender=dns_lookup)
dns_lookup.lookup_service(SIPURI(self.account.id.domain), "stun")
else:
self._init_rtp_transport()
def get_local_media(self, for_offer):
with self._lock:
if self.state not in ["INITIALIZED", "WAIT_ICE", "ESTABLISHED"]:
raise RuntimeError("AudioStream.get_local_media() may only be " +
"called in the INITIALIZED, WAIT_ICE or ESTABLISHED states")
if for_offer:
old_direction = self._audio_transport.direction
if old_direction is None:
new_direction = "sendrecv"
elif "send" in old_direction:
new_direction = ("sendonly" if (self._hold_request == 'hold' or (self._hold_request is None and self.on_hold_by_local)) else "sendrecv")
else:
new_direction = ("inactive" if (self._hold_request == 'hold' or (self._hold_request is None and self.on_hold_by_local)) else "recvonly")
else:
new_direction = None
return self._audio_transport.get_local_media(for_offer, new_direction)
def start(self, local_sdp, remote_sdp, stream_index):
with self._lock:
if self.state != "INITIALIZED":
raise RuntimeError("AudioStream.start() may only be " +
"called in the INITIALIZED state")
settings = SIPSimpleSettings()
self._audio_transport.start(local_sdp, remote_sdp, stream_index, no_media_timeout=settings.rtp.timeout,
media_check_interval=settings.rtp.timeout)
self._check_hold(self._audio_transport.direction, True)
if self._try_ice:
self.state = 'WAIT_ICE'
else:
self.state = 'ESTABLISHED'
self.notification_center.post_notification("MediaStreamDidStart", self, TimestampedNotificationData())
def validate_update(self, remote_sdp, stream_index):
with self._lock:
# TODO: implement
return True
def update(self, local_sdp, remote_sdp, stream_index):
with self._lock:
if self._rtp_transport.remote_rtp_port_sdp != remote_sdp.media[stream_index].port:
settings = SIPSimpleSettings()
if self._audio_rec is not None:
self.bridge.remove(self._audio_rec)
old_consumer_slot = self.consumer_slot
old_producer_slot = self.producer_slot
self.notification_center.remove_observer(self, sender=self._audio_transport)
self._audio_transport.stop()
try:
self._audio_transport = AudioTransport(self.mixer, self._rtp_transport,
remote_sdp, stream_index,
codecs=(list(self.account.rtp.audio_codec_list)
if self.account.rtp.audio_codec_list else list(settings.rtp.audio_codec_list)))
except SIPCoreError, e:
self.state = "ENDED"
self.notification_center.post_notification("MediaStreamDidFail", self,
TimestampedNotificationData(reason=e.args[0]))
return
self.notification_center.add_observer(self, sender=self._audio_transport)
self._audio_transport.start(local_sdp, remote_sdp, stream_index, no_media_timeout=settings.rtp.timeout,
media_check_interval=settings.rtp.timeout)
self.notification_center.post_notification('AudioPortDidChangeSlots', sender=self, data=TimestampedNotificationData(consumer_slot_changed=True, producer_slot_changed=True,
old_consumer_slot=old_consumer_slot, new_consumer_slot=self.consumer_slot,
old_producer_slot=old_producer_slot, new_producer_slot=self.producer_slot))
self._check_hold(self._audio_transport.direction, True)
self.notification_center.post_notification("AudioStreamDidChangeRTPParameters", self, TimestampedNotificationData())
else:
new_direction = local_sdp.media[stream_index].direction
self._audio_transport.update_direction(new_direction)
self._check_hold(new_direction, False)
self._hold_request = None
def hold(self):
with self._lock:
if self.on_hold_by_local or self._hold_request == 'hold':
return
if self.state == "ESTABLISHED":
self.bridge.remove(self)
self._hold_request = 'hold'
def unhold(self):
with self._lock:
if (not self.on_hold_by_local and self._hold_request != 'hold') or self._hold_request == 'unhold':
return
if self.state == "ESTABLISHED" and self._hold_request == 'hold':
self.bridge.add(self)
self._hold_request = None if self._hold_request == 'hold' else 'unhold'
def deactivate(self):
pass
def end(self):
with self._lock:
if self.state != "ENDED":
if self._audio_transport is not None:
self.notification_center.post_notification("MediaStreamWillEnd", self,
TimestampedNotificationData())
if self._audio_rec is not None:
self._stop_recording()
self._audio_transport.stop()
self.notification_center.remove_observer(self, sender=self._audio_transport)
self._audio_transport = None
self._rtp_transport = None
self.state = "ENDED"
self.notification_center.post_notification("MediaStreamDidEnd", self,
TimestampedNotificationData())
else:
self.state = "ENDED"
self.bridge.stop()
self._session = None
def send_dtmf(self, digit):
with self._lock:
if self.state != "ESTABLISHED":
raise RuntimeError("AudioStream.send_dtmf() cannot be used in %s state" % self.state)
try:
self._audio_transport.send_dtmf(digit)
except PJSIPError, e:
if not e.args[0].endswith("(PJ_ETOOMANY)"):
raise
def start_recording(self, filename):
with self._lock:
if self.state != "ESTABLISHED":
raise RuntimeError("AudioStream.start_recording() may only be called in the ESTABLISHED state")
if self._audio_rec is not None:
raise RuntimeError("Already recording audio to a file")
self._audio_rec = WaveRecorder(self.mixer, filename)
self._check_recording()
def stop_recording(self):
with self._lock:
if self._audio_rec is None:
raise RuntimeError("Not recording any audio")
self._stop_recording()
# Notification handling
#
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_DNSLookupDidFail(self, notification):
with self._lock:
self.notification_center.remove_observer(self, sender=notification.sender)
if self.state == "ENDED":
return
self._init_rtp_transport()
def _NH_DNSLookupDidSucceed(self, notification):
with self._lock:
self.notification_center.remove_observer(self, sender=notification.sender)
if self.state == "ENDED":
return
self._init_rtp_transport(notification.data.result)
def _NH_RTPTransportDidFail(self, notification):
with self._lock:
self.notification_center.remove_observer(self, sender=notification.sender)
if self.state == "ENDED":
return
self._try_next_rtp_transport(notification.data.reason)
def _NH_RTPTransportDidInitialize(self, notification):
settings = SIPSimpleSettings()
rtp_transport = notification.sender
with self._lock:
if not rtp_transport.use_ice:
self.notification_center.remove_observer(self, sender=rtp_transport)
if self.state == "ENDED":
return
del self._rtp_args
del self._stun_servers
try:
if hasattr(self, "_incoming_remote_sdp"):
try:
audio_transport = AudioTransport(self.mixer, rtp_transport,
self._incoming_remote_sdp, self._incoming_stream_index,
codecs=(list(self.account.rtp.audio_codec_list)
if self.account.rtp.audio_codec_list else list(settings.rtp.audio_codec_list)))
finally:
del self._incoming_remote_sdp
del self._incoming_stream_index
else:
audio_transport = AudioTransport(self.mixer, rtp_transport,
codecs=(list(self.account.rtp.audio_codec_list)
if self.account.rtp.audio_codec_list else list(settings.rtp.audio_codec_list)))
except SIPCoreError, e:
self.state = "ENDED"
self.notification_center.post_notification("MediaStreamDidFail", self,
TimestampedNotificationData(reason=e.args[0]))
return
self._rtp_transport = rtp_transport
self._audio_transport = audio_transport
self.notification_center.add_observer(self, sender=audio_transport)
self.state = "INITIALIZED"
self.notification_center.post_notification("MediaStreamDidInitialize", self, TimestampedNotificationData())
def _NH_RTPAudioStreamGotDTMF(self, notification):
self.notification_center.post_notification("AudioStreamGotDTMF", self,
NotificationData(timestamp=notification.data.timestamp, digit=notification.data.digit))
+ def _NH_RTPAudioTransportDidTimeout(self, notification):
+ self.notification_center.post_notification("AudioStreamDidTimeout", self, TimestampedNotificationData())
+
def _NH_RTPTransportICENegotiationStateDidChange(self, notification):
self.notification_center.post_notification("AudioStreamICENegotiationStateDidChange", self, data=notification.data)
def _NH_RTPTransportICENegotiationDidSucceed(self, notification):
self._ice_state = "IN_USE"
rtp_transport = notification.sender
self.notification_center.remove_observer(self, sender=rtp_transport)
with self._lock:
if self.state != "WAIT_ICE":
return
self.notification_center.post_notification("AudioStreamICENegotiationDidSucceed", self, data=notification.data)
self.state = 'ESTABLISHED'
self.notification_center.post_notification("MediaStreamDidStart", self, TimestampedNotificationData())
def _NH_RTPTransportICENegotiationDidFail(self, notification):
self._ice_state = "FAILED"
rtp_transport = notification.sender
self.notification_center.remove_observer(self, sender=rtp_transport)
with self._lock:
if self.state != "WAIT_ICE":
return
self.notification_center.post_notification("AudioStreamICENegotiationDidFail", self, data=notification.data)
self.state = 'ESTABLISHED'
self.notification_center.post_notification("MediaStreamDidStart", self, TimestampedNotificationData())
# Private methods
#
def _init_rtp_transport(self, stun_servers=None):
self._rtp_args = dict()
self._rtp_args["use_srtp"] = self._use_srtp
self._rtp_args["srtp_forced"] = self._use_srtp and self._try_forced_srtp
self._rtp_args["use_ice"] = self._try_ice
self._stun_servers = [(None, None)]
if stun_servers:
self._stun_servers.extend(reversed(stun_servers))
self._try_next_rtp_transport()
def _try_next_rtp_transport(self, failure_reason=None):
# TODO: log failure_reason if it is not None? Or send a notification?
if self._stun_servers:
stun_ip, stun_port = self._stun_servers.pop()
observer_added = False
try:
rtp_transport = RTPTransport(ice_stun_address=stun_ip, ice_stun_port=stun_port, **self._rtp_args)
self.notification_center.add_observer(self, sender=rtp_transport)
observer_added = True
rtp_transport.set_INIT()
except SIPCoreError, e:
if observer_added:
self.notification_center.remove_observer(self, sender=rtp_transport)
self._try_next_rtp_transport(e.args[0])
else:
self.state = "ENDED"
self.notification_center.post_notification("MediaStreamDidFail", self,
TimestampedNotificationData(reason=failure_reason))
def _check_hold(self, direction, is_initial):
was_on_hold_by_local = self.on_hold_by_local
was_on_hold_by_remote = self.on_hold_by_remote
self.on_hold_by_local = "recv" not in direction
self.on_hold_by_remote = "send" not in direction
if (is_initial or was_on_hold_by_local) and not self.on_hold_by_local and self._hold_request != 'hold':
self.bridge.add(self)
if not was_on_hold_by_local and self.on_hold_by_local:
self.notification_center.post_notification("AudioStreamDidChangeHoldState", self,
TimestampedNotificationData(originator="local", on_hold=True))
if was_on_hold_by_local and not self.on_hold_by_local:
self.notification_center.post_notification("AudioStreamDidChangeHoldState", self,
TimestampedNotificationData(originator="local", on_hold=False))
if not was_on_hold_by_remote and self.on_hold_by_remote:
self.notification_center.post_notification("AudioStreamDidChangeHoldState", self,
TimestampedNotificationData(originator="remote", on_hold=True))
if was_on_hold_by_remote and not self.on_hold_by_remote:
self.notification_center.post_notification("AudioStreamDidChangeHoldState", self,
TimestampedNotificationData(originator="remote", on_hold=False))
if self._audio_rec is not None:
self._check_recording()
def _check_recording(self):
if not self._audio_rec.is_active:
self.notification_center.post_notification("AudioStreamWillStartRecordingAudio", self,
TimestampedNotificationData(filename=self._audio_rec.filename))
try:
self._audio_rec.start()
except SIPCoreError, e:
self._audio_rec = None
self.notification_center.post_notification("AudioStreamDidStopRecordingAudio", self,
TimestampedNotificationData(filename=self._audio_rec.filename, reason=e.args[0]))
return
self.notification_center.post_notification("AudioStreamDidStartRecordingAudio", self,
TimestampedNotificationData(filename=self._audio_rec.filename))
if not self.on_hold:
self.bridge.add(self._audio_rec)
elif self._audio_rec in self.bridge:
self.bridge.remove(self._audio_rec)
def _stop_recording(self):
self.notification_center.post_notification("AudioStreamWillStopRecordingAudio", self,
TimestampedNotificationData(filename=self._audio_rec.filename))
try:
if self._audio_rec.is_active:
self._audio_rec.stop()
finally:
self.notification_center.post_notification("AudioStreamDidStopRecordingAudio", self,
TimestampedNotificationData(filename=self._audio_rec.filename))
self._audio_rec = None

File Metadata

Mime Type
text/x-diff
Expires
Sat, Feb 1, 5:26 PM (1 d, 16 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3489422
Default Alt Text
(71 KB)

Event Timeline