Page MenuHomePhabricator

No OneTemporary

diff --git a/sylk/applications/xmppgateway/xmpp/jingle/session.py b/sylk/applications/xmppgateway/xmpp/jingle/session.py
index 01df3ff..4eefba4 100644
--- a/sylk/applications/xmppgateway/xmpp/jingle/session.py
+++ b/sylk/applications/xmppgateway/xmpp/jingle/session.py
@@ -1,773 +1,783 @@
# Copyright (C) 2013 AG Projects. See LICENSE for details
#
import random
import string
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python import Null
from application.python.types import Singleton
from cStringIO import StringIO
from datetime import datetime
from eventlib import api, coros, proc
from eventlib.twistedutil import block_on
from lxml import etree
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import SDPSession, SDPMediaStream, SDPConnection, SDPNegotiator
from sipsimple.core import SIPCoreError
from sipsimple.threading import run_in_twisted_thread
from twisted.internet import reactor
from twisted.words.protocols.jabber.error import StanzaError
from twisted.words.protocols.jabber.xmlstream import TimeoutError as IqTimeoutError
from zope.interface import implements
from sylk.accounts import DefaultAccount
from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI
from sylk.applications.xmppgateway.xmpp.jingle.streams import MediaStreamRegistry, InvalidStreamError, UnknownStreamError
from sylk.applications.xmppgateway.xmpp.jingle.util import jingle_to_sdp, sdp_to_jingle
from sylk.applications.xmppgateway.xmpp.stanzas import jingle
from sylk.configuration import SIPConfig
def random_id():
return ''.join(random.choice(string.ascii_letters+string.digits) for x in xrange(32))
class MediaStreamDidFailError(Exception):
def __init__(self, stream, data):
self.stream = stream
self.data = data
+class MediaStreamDidNotInitializeError(Exception):
+ def __init__(self, stream, data):
+ self.stream = stream
+ self.data = data
+
+
class Operation(object):
__params__ = ()
def __init__(self, **params):
for name, value in params.iteritems():
setattr(self, name, value)
for param in set(self.__params__).difference(params):
raise ValueError("missing operation parameter: '%s'" % param)
self.channel = coros.queue()
class AcceptOperation(Operation):
__params__ = ('streams', 'is_focus')
class SendRingIndicationOperation(Operation):
__params__ = ()
class RejectOperation(Operation):
__params__ = ('reason',)
class EndOperation(Operation):
__params__ = ()
class HoldOperation(Operation):
__params__ = ()
class UnholdOperation(Operation):
__params__ = ()
class ProcessRemoteOperation(Operation):
__params__ = ('notification',)
class ConnectOperation(Operation):
__params__ = ('sender', 'recipient', 'streams', 'is_focus')
class SendConferenceInfoOperation(Operation):
__params__ = ('xml',)
class JingleSession(object):
implements(IObserver)
jingle_stanza_timeout = 3
media_stream_timeout = 15
def __init__(self, protocol):
self.account = DefaultAccount()
self._protocol = protocol
self._id = None
self._local_identity = None
self._remote_identity = None
self._local_jid = None
self._remote_jid = None
self._channel = coros.queue()
self._current_operation = None
self._proc = proc.spawn(self._run)
self._timer = None
self._sdp_negotiator = None
self._pending_transport_info_stanzas = []
self.direction = None
self.state = None
self.streams = None
self.proposed_streams = None
self.start_time = None
self.end_time = None
self.on_hold = False
self.local_focus = False
def init_incoming(self, stanza):
self._id = stanza.jingle.sid
self._local_identity = Identity(FrozenURI.parse(stanza.recipient))
self._remote_identity = Identity(FrozenURI.parse(stanza.sender))
self._local_jid = self._local_identity.uri.as_xmpp_jid()
self._remote_jid = self._remote_identity.uri.as_xmpp_jid()
remote_sdp = jingle_to_sdp(stanza.jingle)
try:
self._sdp_negotiator = SDPNegotiator.create_with_remote_offer(remote_sdp)
except SIPCoreError, e:
self._fail(originator='local', reason='general-error', description=str(e))
return
self.proposed_streams = []
for index, media_stream in enumerate(remote_sdp.media):
if media_stream.port != 0:
for stream_type in MediaStreamRegistry():
try:
stream = stream_type.new_from_sdp(self, remote_sdp, index)
except InvalidStreamError:
break
except UnknownStreamError:
continue
else:
stream.index = index
self.proposed_streams.append(stream)
break
if self.proposed_streams:
self.direction = 'incoming'
self.state = 'incoming'
NotificationCenter().post_notification('JingleSessionNewIncoming', sender=self, data=NotificationData(streams=self.proposed_streams))
else:
self._fail(originator='local', reason='unsupported-applications')
def connect(self, sender_identity, recipient_identity, streams, is_focus=False):
self._schedule_operation(ConnectOperation(sender=sender_identity, recipient=recipient_identity, streams=streams, is_focus=is_focus))
def send_ring_indication(self):
self._schedule_operation(SendRingIndicationOperation())
def accept(self, streams, is_focus=False):
self._schedule_operation(AcceptOperation(streams=streams, is_focus=is_focus))
def reject(self, reason='busy'):
self._schedule_operation(RejectOperation(reason=reason))
def hold(self):
self._schedule_operation(HoldOperation())
def unhold(self):
self._schedule_operation(UnholdOperation())
def end(self):
self._schedule_operation(EndOperation())
def add_stream(self):
raise NotImplementedError
def remove_stream(self):
raise NotImplementedError
@property
def id(self):
return self._id
@property
def local_identity(self):
return self._local_identity
@property
def remote_identity(self):
return self._remote_identity
@run_in_twisted_thread
def _send_conference_info(self, xml):
# This function is not meant for users to call, entities with knowledge about JingleSession
# internals will call it, such as the MediaSessionHandler
self._schedule_operation(SendConferenceInfoOperation(xml=xml))
def _send_stanza(self, stanza):
if self.direction == 'incoming':
stanza.jingle.initiator = unicode(self._remote_jid)
stanza.jingle.responder = unicode(self._local_jid)
else:
stanza.jingle.initiator = unicode(self._local_jid)
stanza.jingle.responder = unicode(self._remote_jid)
stanza.timeout = self.jingle_stanza_timeout
return self._protocol.request(stanza)
def _fail(self, originator='local', reason='general-error', description=None):
reason = jingle.Reason(jingle.ReasonType(reason), text=description)
stanza = self._protocol.sessionTerminate(self._local_jid, self._remote_jid, self._id, reason)
self._send_stanza(stanza)
self.state = 'terminated'
failure_str = '%s%s' % (reason, ' %s' % description if description else '')
NotificationCenter().post_notification('JingleSessionDidFail', sender=self, data=NotificationData(originator='local', reason=failure_str))
self._channel.send_exception(proc.ProcExit)
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_MediaStreamDidInitialize(self, notification):
if self._current_operation is not None:
self._current_operation.channel.send(notification)
+ def _NH_MediaStreamDidNotInitialize(self, notification):
+ if self._current_operation is not None:
+ self._current_operation.channel.send_exception(MediaStreamDidNotInitializeError(notification.sender, notification.data))
+
def _NH_MediaStreamDidStart(self, notification):
if self._current_operation is not None:
self._current_operation.channel.send(notification)
def _NH_MediaStreamDidFail(self, notification):
if self._current_operation is not None:
self._current_operation.channel.send_exception(MediaStreamDidFailError(notification.sender, notification.data))
else:
self.end()
def _NH_XMPPGotJingleSessionAccept(self, notification):
self._schedule_operation(ProcessRemoteOperation(notification=notification))
def _NH_XMPPGotJingleSessionTerminate(self, notification):
self._schedule_operation(ProcessRemoteOperation(notification=notification))
def _NH_XMPPGotJingleSessionInfo(self, notification):
self._schedule_operation(ProcessRemoteOperation(notification=notification))
def _NH_XMPPGotJingleDescriptionInfo(self, notification):
self._schedule_operation(ProcessRemoteOperation(notification=notification))
def _NH_XMPPGotJingleTransportInfo(self, notification):
self._schedule_operation(ProcessRemoteOperation(notification=notification))
# Operation handling
@run_in_twisted_thread
def _schedule_operation(self, operation):
self._channel.send(operation)
def _run(self):
while True:
self._current_operation = op = self._channel.wait()
try:
handler = getattr(self, '_OH_%s' % op.__class__.__name__)
handler(op)
except BaseException:
self._proc = None
raise
finally:
self._current_operation = None
def _OH_AcceptOperation(self, operation):
if self.state != 'incoming':
return
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
streams = operation.streams
for stream in self.proposed_streams:
if stream in streams:
notification_center.add_observer(self, sender=stream)
stream.initialize(self, direction='incoming')
try:
wait_count = len(self.proposed_streams)
while wait_count > 0:
notification = operation.channel.wait()
if notification.name == 'MediaStreamDidInitialize':
wait_count -= 1
remote_sdp = self._sdp_negotiator.current_remote
local_ip = SIPConfig.local_ip.normalized
local_sdp = SDPSession(local_ip, connection=SDPConnection(local_ip), name=settings.user_agent)
stream_map = dict((stream.index, stream) for stream in self.proposed_streams)
for index, media in enumerate(remote_sdp.media):
stream = stream_map.get(index, None)
if stream is not None:
- media = stream.get_local_media(for_offer=False)
+ media = stream.get_local_media(remote_sdp=remote_sdp, index=index)
else:
media = SDPMediaStream.new(media)
media.port = 0
media.attributes = []
local_sdp.media.append(media)
try:
self._sdp_negotiator.set_local_answer(local_sdp)
self._sdp_negotiator.negotiate()
except SIPCoreError, e:
self._fail(originator='local', reason='incompatible-parameters', description=str(e))
return
self.local_focus = operation.is_focus
notification_center.post_notification('JingleSessionWillStart', sender=self)
# Get active SDPs (negotiator may make changes)
local_sdp = self._sdp_negotiator.active_local
remote_sdp = self._sdp_negotiator.active_remote
# Build the payload and send it over
payload = sdp_to_jingle(local_sdp)
payload.sid = self._id
if self.local_focus:
payload.conference_info = jingle.ConferenceInfo(True)
stanza = self._protocol.sessionAccept(self._local_jid, self._remote_jid, payload)
d = self._send_stanza(stanza)
block_on(d)
wait_count = 0
stream_map = dict((stream.index, stream) for stream in self.proposed_streams)
for index, local_media in enumerate(local_sdp.media):
remote_media = remote_sdp.media[index]
stream = stream_map.get(index, None)
if stream is not None:
if remote_media.port:
wait_count += 1
stream.start(local_sdp, remote_sdp, index)
else:
notification_center.remove_observer(self, sender=stream)
self.proposed_streams.remove(stream)
del stream_map[stream.index]
stream.deactivate()
stream.end()
removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)]
for stream in removed_streams:
notification_center.remove_observer(self, sender=stream)
self.proposed_streams.remove(stream)
del stream_map[stream.index]
stream.deactivate()
stream.end()
with api.timeout(self.media_stream_timeout):
while wait_count > 0:
notification = operation.channel.wait()
if notification.name == 'MediaStreamDidStart':
wait_count -= 1
- except (MediaStreamDidFailError, api.TimeoutError, IqTimeoutError, StanzaError), e:
+ except (MediaStreamDidNotInitializeError, MediaStreamDidFailError, api.TimeoutError, IqTimeoutError, StanzaError), e:
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
if isinstance(e, api.TimeoutError):
error = 'media stream timed out while starting'
elif isinstance(e, IqTimeoutError):
error = 'timeout sending IQ stanza'
elif isinstance(e, StanzaError):
error = str(e.condition)
else:
error = 'media stream failed: %s' % e.data.reason
self._fail(originator='local', reason='failed-application', description=error)
else:
self.state = 'connected'
self.streams = self.proposed_streams
self.proposed_streams = None
self.start_time = datetime.now()
notification_center.post_notification('JingleSessionDidStart', self, NotificationData(streams=self.streams))
def _OH_ConnectOperation(self, operation):
if self.state is not None:
return
settings = SIPSimpleSettings()
notification_center = NotificationCenter()
self.direction = 'outgoing'
self.state = 'connecting'
self.proposed_streams = operation.streams
self.local_focus = operation.is_focus
self._id = random_id()
self._local_identity = operation.sender
self._remote_identity = operation.recipient
self._local_jid = self._local_identity.uri.as_xmpp_jid()
self._remote_jid = self._remote_identity.uri.as_xmpp_jid()
notification_center.post_notification('JingleSessionNewOutgoing', self, NotificationData(streams=operation.streams))
for stream in self.proposed_streams:
notification_center.add_observer(self, sender=stream)
stream.initialize(self, direction='outgoing')
try:
wait_count = len(self.proposed_streams)
while wait_count > 0:
notification = operation.channel.wait()
if notification.name == 'MediaStreamDidInitialize':
wait_count -= 1
# Build local SDP and negotiator
local_ip = SIPConfig.local_ip.normalized
local_sdp = SDPSession(local_ip, connection=SDPConnection(local_ip), name=settings.user_agent)
for index, stream in enumerate(self.proposed_streams):
stream.index = index
- media = stream.get_local_media(for_offer=True)
+ media = stream.get_local_media(remote_sdp=None, index=index)
local_sdp.media.append(media)
self._sdp_negotiator = SDPNegotiator.create_with_local_offer(local_sdp)
# Build the payload and send it over
payload = sdp_to_jingle(local_sdp)
payload.sid = self._id
if self.local_focus:
payload.conference_info = jingle.ConferenceInfo(True)
stanza = self._protocol.sessionInitiate(self._local_jid, self._remote_jid, payload)
d = self._send_stanza(stanza)
block_on(d)
- except (MediaStreamDidFailError, IqTimeoutError, StanzaError, SIPCoreError), e:
+ except (MediaStreamDidNotInitializeError, MediaStreamDidFailError, IqTimeoutError, StanzaError, SIPCoreError), e:
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
if isinstance(e, IqTimeoutError):
error = 'timeout sending IQ stanza'
elif isinstance(e, StanzaError):
error = str(e.condition)
elif isinstance(e, SIPCoreError):
error = str(e)
else:
error = 'media stream failed: %s' % e.data.reason
self.state = 'terminated'
NotificationCenter().post_notification('JingleSessionDidFail', sender=self, data=NotificationData(originator='local', reason=error))
self._channel.send_exception(proc.ProcExit)
else:
self._timer = reactor.callLater(settings.sip.invite_timeout, self.end)
def _OH_RejectOperation(self, operation):
if self.state != 'incoming':
return
reason = jingle.Reason(jingle.ReasonType(operation.reason))
stanza = self._protocol.sessionTerminate(self._local_jid, self._remote_jid, self._id, reason)
self._send_stanza(stanza)
self.state = 'terminated'
self._channel.send_exception(proc.ProcExit)
def _OH_EndOperation(self, operation):
if self.state not in ('connecting', 'connected'):
return
if self._timer is not None and self._timer.active():
self._timer.cancel()
self._timer = None
prev_state = self.state
self.state = 'terminating'
notification_center = NotificationCenter()
notification_center.post_notification('JingleSessionWillEnd', self)
streams = (self.streams or []) + (self.proposed_streams or [])
for stream in streams[:]:
try:
notification_center.remove_observer(self, sender=stream)
except KeyError:
streams.remove(stream)
else:
stream.deactivate()
if prev_state == 'connected':
reason = jingle.Reason(jingle.ReasonType('success'))
else:
reason = jingle.Reason(jingle.ReasonType('cancel'))
stanza = self._protocol.sessionTerminate(self._local_jid, self._remote_jid, self._id, reason)
self._send_stanza(stanza)
self.state = 'terminated'
if prev_state == 'connected':
self.end_time = datetime.now()
notification_center.post_notification('JingleSessionDidEnd', self, NotificationData(originator='local'))
else:
notification_center.post_notification('JingleSessionDidFail', self, NotificationData(originator='local', reason='cancel'))
for stream in streams:
stream.end()
self._channel.send_exception(proc.ProcExit)
def _OH_SendRingIndicationOperation(self, operation):
if self.state != 'incoming':
return
stanza = self._protocol.sessionInfo(self._local_jid, self._remote_jid, self._id, jingle.Info('ringing'))
self._send_stanza(stanza)
def _OH_HoldOperation(self, operation):
if self.state != 'connected':
return
if self.on_hold:
return
self.on_hold = True
for stream in self.streams:
stream.hold()
stanza = self._protocol.sessionInfo(self._local_jid, self._remote_jid, self._id, jingle.Info('hold'))
self._send_stanza(stanza)
NotificationCenter().post_notification('JingleSessionDidChangeHoldState', self, NotificationData(originator='local', on_hold=True, partial=False))
def _OH_UnholdOperation(self, operation):
if self.state != 'connected':
return
if not self.on_hold:
return
self.on_hold = False
for stream in self.streams:
stream.unhold()
stanza = self._protocol.sessionInfo(self._local_jid, self._remote_jid, self._id, jingle.Info('unhold'))
self._send_stanza(stanza)
NotificationCenter().post_notification('JingleSessionDidChangeHoldState', self, NotificationData(originator='local', on_hold=False, partial=False))
def _OH_SendConferenceInfoOperation(self, operation):
if self.state != 'connected':
return
if not self.local_focus:
return
tree = etree.parse(StringIO(operation.xml))
tree.getroot().attrib['sid'] = self._id # FIXME: non-standard, but Jitsi does it
data = etree.tostring(tree, xml_declaration=False) # Strip the XML heading
stanza = jingle.ConferenceInfoIq(sender=self._local_jid, recipient=self._remote_jid, payload=data)
stanza.timeout = self.jingle_stanza_timeout
self._protocol.request(stanza)
def _OH_ProcessRemoteOperation(self, operation):
notification = operation.notification
stanza = notification.data.stanza
if notification.name == 'XMPPGotJingleSessionTerminate':
if self.state not in ('incoming', 'connecting', 'connected_pending_accept', 'connected'):
return
if self._timer is not None and self._timer.active():
self._timer.cancel()
self._timer = None
# Session ended remotely
prev_state = self.state
self.state = 'terminated'
if prev_state == 'incoming':
reason = stanza.jingle.reason.value if stanza.jingle.reason else 'cancel'
notification.center.post_notification('JingleSessionDidFail', self, NotificationData(originator='remote', reason=reason))
else:
notification.center.post_notification('JingleSessionWillEnd', self, NotificationData(originator='remote'))
streams = self.proposed_streams if prev_state == 'connecting' else self.streams
for stream in streams:
notification.center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self.end_time = datetime.now()
notification.center.post_notification('JingleSessionDidEnd', self, NotificationData(originator='remote'))
self._channel.send_exception(proc.ProcExit)
elif notification.name == 'XMPPGotJingleSessionInfo':
info = stanza.jingle.info
if not info:
return
if info == 'ringing':
if self.state not in ('connecting', 'connected_pending_accept'):
return
notification.center.post_notification('JingleSessionGotRingIndication', self)
elif info in ('hold', 'unhold'):
if self.state != 'connected':
return
notification.center.post_notification('JingleSessionDidChangeHoldState', self, NotificationData(originator='remote', on_hold=info=='hold', partial=False))
elif notification.name == 'XMPPGotJingleDescriptionInfo':
if self.state != 'connecting':
return
# Add candidates acquired on transport-info stanzas
for s in self._pending_transport_info_stanzas:
for c in s.jingle.content:
content = next(content for content in stanza.jingle.content if content.name == c.name)
content.transport.candidates.extend(c.transport.candidates)
if isinstance(content.transport, jingle.IceUdpTransport):
if not content.transport.ufrag and c.transport.ufrag:
content.transport.ufrag = c.transport.ufrag
if not content.transport.password and c.transport.password:
content.transport.password = c.transport.password
remote_sdp = jingle_to_sdp(stanza.jingle)
try:
self._sdp_negotiator.set_remote_answer(remote_sdp)
self._sdp_negotiator.negotiate()
except SIPCoreError:
# The description-info stanza may have been just a parameter change, not a full 'SDP'
return
if self._timer is not None and self._timer.active():
self._timer.cancel()
self._timer = None
del self._pending_transport_info_stanzas[:]
# Get active SDPs (negotiator may make changes)
local_sdp = self._sdp_negotiator.active_local
remote_sdp = self._sdp_negotiator.active_remote
notification.center.post_notification('JingleSessionWillStart', sender=self)
stream_map = dict((stream.index, stream) for stream in self.proposed_streams)
for index, local_media in enumerate(local_sdp.media):
remote_media = remote_sdp.media[index]
stream = stream_map[index]
if remote_media.port:
stream.start(local_sdp, remote_sdp, index)
else:
notification.center.remove_observer(self, sender=stream)
self.proposed_streams.remove(stream)
del stream_map[stream.index]
stream.deactivate()
stream.end()
removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)]
for stream in removed_streams:
notification.center.remove_observer(self, sender=stream)
self.proposed_streams.remove(stream)
del stream_map[stream.index]
stream.deactivate()
stream.end()
try:
with api.timeout(self.media_stream_timeout):
wait_count = len(self.proposed_streams)
while wait_count > 0:
notification = operation.channel.wait()
if notification.name == 'MediaStreamDidStart':
wait_count -= 1
except (MediaStreamDidFailError, api.TimeoutError), e:
for stream in self.proposed_streams:
notification.center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
if isinstance(e, api.TimeoutError):
error = 'media stream timed out while starting'
else:
error = 'media stream failed: %s' % e.data.reason
self._fail(originator='local', reason='failed-application', description=error)
else:
self.state = 'connected_pending_accept'
self.streams = self.proposed_streams
self.proposed_streams = None
self.start_time = datetime.now()
# Hold the streams to prevent real RTP from flowing
for stream in self.streams:
stream.hold()
elif notification.name == 'XMPPGotJingleSessionAccept':
if self.state not in ('connecting', 'connected_pending_accept'):
return
if self._timer is not None and self._timer.active():
self._timer.cancel()
self._timer = None
if self.state == 'connected_pending_accept':
# We already negotiated ICE and media is 'flowing' (not really because streams are on hold)
# unhold the streams and pretend the session just started
for stream in self.streams:
stream.unhold()
self.state = 'connected'
notification.center.post_notification('JingleSessionDidStart', self, NotificationData(streams=self.streams))
return
# Add candidates acquired on transport-info stanzas
for s in self._pending_transport_info_stanzas:
for c in s.jingle.content:
content = next(content for content in stanza.jingle.content if content.name == c.name)
content.transport.candidates.extend(c.transport.candidates)
if isinstance(content.transport, jingle.IceUdpTransport):
if not content.transport.ufrag and c.transport.ufrag:
content.transport.ufrag = c.transport.ufrag
if not content.transport.password and c.transport.password:
content.transport.password = c.transport.password
del self._pending_transport_info_stanzas[:]
remote_sdp = jingle_to_sdp(stanza.jingle)
try:
self._sdp_negotiator.set_remote_answer(remote_sdp)
self._sdp_negotiator.negotiate()
except SIPCoreError, e:
for stream in self.proposed_streams:
notification.center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self._fail(originator='remote', reason='incompatible-parameters', description=str(e))
return
# Get active SDPs (negotiator may make changes)
local_sdp = self._sdp_negotiator.active_local
remote_sdp = self._sdp_negotiator.active_remote
notification.center.post_notification('JingleSessionWillStart', sender=self)
stream_map = dict((stream.index, stream) for stream in self.proposed_streams)
for index, local_media in enumerate(local_sdp.media):
remote_media = remote_sdp.media[index]
stream = stream_map[index]
if remote_media.port:
stream.start(local_sdp, remote_sdp, index)
else:
notification.center.remove_observer(self, sender=stream)
self.proposed_streams.remove(stream)
del stream_map[stream.index]
stream.deactivate()
stream.end()
removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)]
for stream in removed_streams:
notification.center.remove_observer(self, sender=stream)
self.proposed_streams.remove(stream)
del stream_map[stream.index]
stream.deactivate()
stream.end()
try:
with api.timeout(self.media_stream_timeout):
wait_count = len(self.proposed_streams)
while wait_count > 0:
notification = operation.channel.wait()
if notification.name == 'MediaStreamDidStart':
wait_count -= 1
except (MediaStreamDidFailError, api.TimeoutError), e:
for stream in self.proposed_streams:
notification.center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
if isinstance(e, api.TimeoutError):
error = 'media stream timed out while starting'
else:
error = 'media stream failed: %s' % e.data.reason
self._fail(originator='local', reason='failed-application', description=error)
else:
self.state = 'connected'
self.streams = self.proposed_streams
self.proposed_streams = None
self.start_time = datetime.now()
notification.center.post_notification('JingleSessionDidStart', self, NotificationData(streams=self.streams))
elif notification.name == 'XMPPGotJingleTransportInfo':
if self.state != 'connecting':
# ICE trickling not supported yet, so only accept candidates before accept
return
self._pending_transport_info_stanzas.append(stanza)
class JingleSessionManager(object):
__metaclass__ = Singleton
implements(IObserver)
def __init__(self):
self.sessions = {}
def start(self):
notification_center = NotificationCenter()
notification_center.add_observer(self, name='JingleSessionNewIncoming')
notification_center.add_observer(self, name='JingleSessionNewOutgoing')
notification_center.add_observer(self, name='JingleSessionDidFail')
notification_center.add_observer(self, name='JingleSessionDidEnd')
def stop(self):
notification_center = NotificationCenter()
notification_center.remove_observer(self, name='JingleSessionNewIncoming')
notification_center.remove_observer(self, name='JingleSessionNewOutgoing')
notification_center.remove_observer(self, name='JingleSessionDidFail')
notification_center.remove_observer(self, name='JingleSessionDidEnd')
def handle_notification(self, notification):
if notification.name in ('JingleSessionNewIncoming', 'JingleSessionNewOutgoing'):
session = notification.sender
self.sessions[session.id] = session
elif notification.name in ('JingleSessionDidFail', 'JingleSessionDidEnd'):
session = notification.sender
del self.sessions[session.id]
diff --git a/sylk/applications/xmppgateway/xmpp/jingle/streams/rtp.py b/sylk/applications/xmppgateway/xmpp/jingle/streams/rtp.py
index 4d5991d..45968df 100644
--- a/sylk/applications/xmppgateway/xmpp/jingle/streams/rtp.py
+++ b/sylk/applications/xmppgateway/xmpp/jingle/streams/rtp.py
@@ -1,439 +1,444 @@
# Copyright (C) 2009-2011 AG Projects. See LICENSE for details.
#
"""
Handling of RTP media streams according to RFC3550, RFC3605, RFC3581,
RFC2833 and RFC3711, RFC3489 and draft-ietf-mmusic-ice-19.
"""
__all__ = ['AudioStream']
from threading import RLock
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python import Null
from zope.interface import implements
from sipsimple.audio import AudioBridge, AudioDevice, IAudioPort
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import AudioTransport, PJSIPError, RTPTransport, SIPCoreError
from sylk.applications.xmppgateway.xmpp.jingle.streams import IMediaStream, InvalidStreamError, MediaStreamRegistrar, UnknownStreamError
class AudioStream(object):
__metaclass__ = MediaStreamRegistrar
implements(IMediaStream, IAudioPort, IObserver)
_streams = []
type = 'audio'
priority = 1
hold_supported = True
def __init__(self):
from sipsimple.application import SIPApplication
self.mixer = SIPApplication.voice_audio_mixer
self.bridge = AudioBridge(self.mixer)
self.device = AudioDevice(self.mixer)
self.notification_center = NotificationCenter()
self.on_hold_by_local = False
self.on_hold_by_remote = False
self.direction = None
self.state = "NULL"
self._audio_transport = None
self._hold_request = None
self._ice_state = "NULL"
self._lock = RLock()
self._rtp_transport = None
self.session = None
self._try_ice = False
self._try_forced_srtp = False
self._use_srtp = False
+ self._initialized = False
+ self._done = False
+ self._failure_reason = None
+
self.bridge.add(self.device)
# Audio properties
#
@property
def codec(self):
return self._audio_transport.codec if self._audio_transport else None
@property
def consumer_slot(self):
return self._audio_transport.slot if self._audio_transport else None
@property
def producer_slot(self):
return self._audio_transport.slot if self._audio_transport and not self.muted else None
@property
def sample_rate(self):
return self._audio_transport.sample_rate if self._audio_transport else None
@property
def statistics(self):
return self._audio_transport.statistics if self._audio_transport else None
def _get_muted(self):
return self.__dict__.get('muted', False)
def _set_muted(self, value):
if not isinstance(value, bool):
raise ValueError("illegal value for muted property: %r" % (value,))
if value == self.muted:
return
old_producer_slot = self.producer_slot
self.__dict__['muted'] = value
notification_center = NotificationCenter()
data = NotificationData(consumer_slot_changed=False, producer_slot_changed=True, old_producer_slot=old_producer_slot, new_producer_slot=self.producer_slot)
notification_center.post_notification('AudioPortDidChangeSlots', sender=self, data=data)
muted = property(_get_muted, _set_muted)
del _get_muted, _set_muted
# RTP properties
#
@property
def local_rtp_address(self):
return self._rtp_transport.local_rtp_address if self._rtp_transport else None
@property
def local_rtp_port(self):
return self._rtp_transport.local_rtp_port if self._rtp_transport else None
@property
def remote_rtp_address(self):
if self._ice_state == "IN_USE":
return self._rtp_transport.remote_rtp_address_received if self._rtp_transport else None
else:
return self._rtp_transport.remote_rtp_address_sdp if self._rtp_transport else None
@property
def remote_rtp_port(self):
if self._ice_state == "IN_USE":
return self._rtp_transport.remote_rtp_port_received if self._rtp_transport else None
else:
return self._rtp_transport.remote_rtp_port_sdp if self._rtp_transport else None
@property
def local_rtp_candidate_type(self):
return self._rtp_transport.local_rtp_candidate_type if self._rtp_transport else None
@property
def remote_rtp_candidate_type(self):
return self._rtp_transport.remote_rtp_candidate_type if self._rtp_transport else None
@property
def srtp_active(self):
return self._rtp_transport.srtp_active if self._rtp_transport else False
@property
def ice_active(self):
return self._ice_state == "IN_USE"
# Generic properties
#
@property
def on_hold(self):
return self.on_hold_by_local or self.on_hold_by_remote
# Public methods
#
@classmethod
def new_from_sdp(cls, session, remote_sdp, stream_index):
# TODO: actually validate the SDP
settings = SIPSimpleSettings()
remote_stream = remote_sdp.media[stream_index]
if remote_stream.media != 'audio':
raise UnknownStreamError
if remote_stream.transport not in ('RTP/AVP', 'RTP/SAVP'):
raise InvalidStreamError("expected RTP/AVP or RTP/SAVP transport in audio stream, got %s" % remote_stream.transport)
if session.account.rtp.srtp_encryption == "mandatory" and not remote_stream.has_srtp:
raise InvalidStreamError("SRTP is locally mandatory but it's not remotely enabled")
supported_codecs = session.account.rtp.audio_codec_list or settings.rtp.audio_codec_list
if not any(codec for codec in remote_stream.codec_list if codec in supported_codecs):
raise InvalidStreamError("no compatible codecs found")
stream = cls()
stream._incoming_remote_sdp = remote_sdp
stream._incoming_stream_index = stream_index
stream._incoming_stream_has_srtp = remote_stream.has_srtp
stream._incoming_stream_has_srtp_forced = remote_stream.transport == 'RTP/SAVP'
return stream
def initialize(self, session, direction):
with self._lock:
self._streams.append(self)
if self.state != "NULL":
raise RuntimeError("AudioStream.initialize() may only be called in the NULL state")
self.state = "INITIALIZING"
self.session = session
if hasattr(self, "_incoming_remote_sdp"):
# ICE attributes could come at the session level or at the media level
remote_stream = self._incoming_remote_sdp.media[self._incoming_stream_index]
self._try_ice = (remote_stream.has_ice_attributes or self._incoming_remote_sdp.has_ice_attributes) and remote_stream.has_ice_candidates
self._use_srtp = self._incoming_stream_has_srtp
self._try_forced_srtp = self._incoming_stream_has_srtp_forced
if self._incoming_stream_has_srtp_forced and not self._use_srtp:
self.state = "ENDED"
- self.notification_center.post_notification('MediaStreamDidFail', sender=self, data=NotificationData(reason="SRTP is remotely mandatory but it's not locally enabled"))
+ self.notification_center.post_notification('MediaStreamDidNotInitialize', sender=self, data=NotificationData(reason="SRTP is remotely mandatory but it's not locally enabled"))
return
del self._incoming_stream_has_srtp
del self._incoming_stream_has_srtp_forced
else:
# TODO: Always use ICE? New settings object?
#self._try_ice = self.session.account.nat_traversal.use_ice
self._try_ice = True
self._use_srtp = self.session.account.rtp.srtp_encryption != "disabled"
self._try_forced_srtp = self.session.account.rtp.srtp_encryption == "mandatory"
self._init_rtp_transport()
- def get_local_media(self, for_offer):
+ def get_local_media(self, remote_sdp=None, index=0):
with self._lock:
if self.state not in ["INITIALIZED", "WAIT_ICE", "ESTABLISHED"]:
- raise RuntimeError("AudioStream.get_local_media() may only be " +
- "called in the INITIALIZED, WAIT_ICE or ESTABLISHED states")
- if for_offer:
+ raise RuntimeError("AudioStream.get_local_media() may only be called in the INITIALIZED, WAIT_ICE or ESTABLISHED states")
+ if remote_sdp is None:
+ # offer
old_direction = self._audio_transport.direction
if old_direction is None:
new_direction = "sendrecv"
elif "send" in old_direction:
new_direction = ("sendonly" if (self._hold_request == 'hold' or (self._hold_request is None and self.on_hold_by_local)) else "sendrecv")
else:
new_direction = ("inactive" if (self._hold_request == 'hold' or (self._hold_request is None and self.on_hold_by_local)) else "recvonly")
else:
new_direction = None
- return self._audio_transport.get_local_media(for_offer, new_direction)
+ return self._audio_transport.get_local_media(remote_sdp, index, new_direction)
def start(self, local_sdp, remote_sdp, stream_index):
with self._lock:
if self.state != "INITIALIZED":
- raise RuntimeError("AudioStream.start() may only be " +
- "called in the INITIALIZED state")
+ raise RuntimeError("AudioStream.start() may only be called in the INITIALIZED state")
settings = SIPSimpleSettings()
self._audio_transport.start(local_sdp, remote_sdp, stream_index, no_media_timeout=settings.rtp.timeout,
media_check_interval=settings.rtp.timeout)
self._check_hold(self._audio_transport.direction, True)
if self._try_ice:
self.state = 'WAIT_ICE'
else:
self.state = 'ESTABLISHED'
self.notification_center.post_notification('MediaStreamDidStart', sender=self)
def validate_update(self, remote_sdp, stream_index):
with self._lock:
# TODO: implement
return True
def update(self, local_sdp, remote_sdp, stream_index):
with self._lock:
connection = remote_sdp.media[stream_index].connection or remote_sdp.connection
- if connection.address != self._rtp_transport.remote_rtp_address_sdp or self._rtp_transport.remote_rtp_port_sdp != remote_sdp.media[stream_index].port:
+ if not self._rtp_transport.ice_active and (connection.address != self._rtp_transport.remote_rtp_address_sdp or self._rtp_transport.remote_rtp_port_sdp != remote_sdp.media[stream_index].port):
settings = SIPSimpleSettings()
old_consumer_slot = self.consumer_slot
old_producer_slot = self.producer_slot
self.notification_center.remove_observer(self, sender=self._audio_transport)
self._audio_transport.stop()
try:
self._audio_transport = AudioTransport(self.mixer, self._rtp_transport, remote_sdp, stream_index, codecs=list(self.session.account.rtp.audio_codec_list or settings.rtp.audio_codec_list))
except SIPCoreError, e:
self.state = "ENDED"
- self.notification_center.post_notification('MediaStreamDidFail', sender=self, data=NotificationData(reason=e.args[0]))
+ self._failure_reason = e.args[0]
+ self.notification_center.post_notification('MediaStreamDidFail', sender=self, data=NotificationData(reason=self._failure_reason))
return
self.notification_center.add_observer(self, sender=self._audio_transport)
self._audio_transport.start(local_sdp, remote_sdp, stream_index, no_media_timeout=settings.rtp.timeout, media_check_interval=settings.rtp.timeout)
self.notification_center.post_notification('AudioPortDidChangeSlots', sender=self, data=NotificationData(consumer_slot_changed=True, producer_slot_changed=True,
old_consumer_slot=old_consumer_slot, new_consumer_slot=self.consumer_slot,
old_producer_slot=old_producer_slot, new_producer_slot=self.producer_slot))
if connection.address == '0.0.0.0' and remote_sdp.media[stream_index].direction == 'sendrecv':
self._audio_transport.update_direction('recvonly')
self._check_hold(self._audio_transport.direction, False)
self.notification_center.post_notification('AudioStreamDidChangeRTPParameters', sender=self)
else:
new_direction = local_sdp.media[stream_index].direction
self._audio_transport.update_direction(new_direction)
self._check_hold(new_direction, False)
self._hold_request = None
def hold(self):
with self._lock:
if self.on_hold_by_local or self._hold_request == 'hold':
return
if self.state == "ESTABLISHED" and self.direction != "inactive":
self.bridge.remove(self)
self._hold_request = 'hold'
def unhold(self):
with self._lock:
if (not self.on_hold_by_local and self._hold_request != 'hold') or self._hold_request == 'unhold':
return
if self.state == "ESTABLISHED" and self._hold_request == 'hold':
self.bridge.add(self)
self._hold_request = None if self._hold_request == 'hold' else 'unhold'
def deactivate(self):
- pass
+ with self._lock:
+ self.bridge.stop()
def end(self):
with self._lock:
- if self.state != "ENDED":
- if self._audio_transport is not None:
- self.notification_center.post_notification('MediaStreamWillEnd', sender=self)
- self._audio_transport.stop()
- self.notification_center.remove_observer(self, sender=self._audio_transport)
- self._audio_transport = None
- self._rtp_transport = None
- self.state = "ENDED"
- self.notification_center.post_notification('MediaStreamDidEnd', sender=self)
- else:
- self.state = "ENDED"
- self.bridge.stop()
+ if not self._initialized or self._done:
+ return
+ self._done = True
+ self.notification_center.post_notification('MediaStreamWillEnd', sender=self)
+ if self._audio_transport is not None:
+ self._audio_transport.stop()
+ self.notification_center.remove_observer(self, sender=self._audio_transport)
+ self._audio_transport = None
+ self._rtp_transport = None
+ self.state = "ENDED"
+ self.notification_center.post_notification('MediaStreamDidEnd', sender=self, data=NotificationData(error=self._failure_reason))
self.session = None
def reset(self, stream_index):
with self._lock:
if self.direction == "inactive" and not self.on_hold_by_local:
new_direction = "sendrecv"
self._audio_transport.update_direction(new_direction)
self._check_hold(new_direction, False)
# TODO: do a full reset, re-creating the AudioTransport, so that a new offer
# would contain all codecs and ICE would be renegotiated -Saul
def send_dtmf(self, digit):
with self._lock:
if self.state != "ESTABLISHED":
raise RuntimeError("AudioStream.send_dtmf() cannot be used in %s state" % self.state)
try:
self._audio_transport.send_dtmf(digit)
except PJSIPError, e:
if not e.args[0].endswith("(PJ_ETOOMANY)"):
raise
# Notification handling
#
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_RTPTransportDidFail(self, notification):
with self._lock:
self.notification_center.remove_observer(self, sender=notification.sender)
if self.state == "ENDED":
return
self._try_next_rtp_transport(notification.data.reason)
def _NH_RTPTransportDidInitialize(self, notification):
settings = SIPSimpleSettings()
rtp_transport = notification.sender
with self._lock:
if not rtp_transport.use_ice:
self.notification_center.remove_observer(self, sender=rtp_transport)
if self.state == "ENDED":
return
del self._rtp_args
del self._stun_servers
try:
if hasattr(self, "_incoming_remote_sdp"):
try:
audio_transport = AudioTransport(self.mixer, rtp_transport, self._incoming_remote_sdp, self._incoming_stream_index,
codecs=list(self.session.account.rtp.audio_codec_list or settings.rtp.audio_codec_list))
finally:
del self._incoming_remote_sdp
del self._incoming_stream_index
else:
audio_transport = AudioTransport(self.mixer, rtp_transport, codecs=list(self.session.account.rtp.audio_codec_list or settings.rtp.audio_codec_list))
except SIPCoreError, e:
self.state = "ENDED"
- self.notification_center.post_notification('MediaStreamDidFail', sender=self, data=NotificationData(reason=e.args[0]))
+ self.notification_center.post_notification('MediaStreamDidNotInitialize', sender=self, data=NotificationData(reason=e.args[0]))
return
self._rtp_transport = rtp_transport
self._audio_transport = audio_transport
self.notification_center.add_observer(self, sender=audio_transport)
+ self._initialized = True
self.state = "INITIALIZED"
self.notification_center.post_notification('MediaStreamDidInitialize', sender=self)
def _NH_RTPAudioStreamGotDTMF(self, notification):
self.notification_center.post_notification('AudioStreamGotDTMF', sender=self, data=NotificationData(digit=notification.data.digit))
def _NH_RTPAudioTransportDidTimeout(self, notification):
self.notification_center.post_notification('AudioStreamDidTimeout', sender=self)
def _NH_RTPTransportICENegotiationStateDidChange(self, notification):
self.notification_center.post_notification('AudioStreamICENegotiationStateDidChange', sender=self, data=notification.data)
def _NH_RTPTransportICENegotiationDidSucceed(self, notification):
- self._ice_state = "IN_USE"
rtp_transport = notification.sender
self.notification_center.remove_observer(self, sender=rtp_transport)
with self._lock:
if self.state != "WAIT_ICE":
return
+ self._ice_state = "IN_USE"
self.notification_center.post_notification('AudioStreamICENegotiationDidSucceed', sender=self, data=notification.data)
self.state = 'ESTABLISHED'
self.notification_center.post_notification('MediaStreamDidStart', sender=self)
def _NH_RTPTransportICENegotiationDidFail(self, notification):
- self._ice_state = "FAILED"
rtp_transport = notification.sender
self.notification_center.remove_observer(self, sender=rtp_transport)
with self._lock:
if self.state != "WAIT_ICE":
return
+ self._ice_state = "FAILED"
self.notification_center.post_notification('AudioStreamICENegotiationDidFail', sender=self, data=notification.data)
self.state = 'ESTABLISHED'
self.notification_center.post_notification('MediaStreamDidStart', sender=self)
# Private methods
#
def _init_rtp_transport(self, stun_servers=None):
self._rtp_args = dict()
self._rtp_args["use_srtp"] = self._use_srtp
self._rtp_args["srtp_forced"] = self._use_srtp and self._try_forced_srtp
self._rtp_args["use_ice"] = self._try_ice
self._stun_servers = [(None, None)]
if stun_servers:
self._stun_servers.extend(reversed(stun_servers))
self._try_next_rtp_transport()
def _try_next_rtp_transport(self, failure_reason=None):
if self._stun_servers:
stun_address, stun_port = self._stun_servers.pop()
observer_added = False
try:
rtp_transport = RTPTransport(ice_stun_address=stun_address, ice_stun_port=stun_port, **self._rtp_args)
self.notification_center.add_observer(self, sender=rtp_transport)
observer_added = True
rtp_transport.set_INIT()
except SIPCoreError, e:
if observer_added:
self.notification_center.remove_observer(self, sender=rtp_transport)
self._try_next_rtp_transport(e.args[0])
else:
self.state = "ENDED"
- self.notification_center.post_notification('MediaStreamDidFail', sender=self, data=NotificationData(reason=failure_reason))
+ self.notification_center.post_notification('MediaStreamDidNotInitialize', sender=self, data=NotificationData(reason=failure_reason))
def _check_hold(self, direction, is_initial):
was_on_hold_by_local = self.on_hold_by_local
was_on_hold_by_remote = self.on_hold_by_remote
was_inactive = self.direction == "inactive"
self.direction = direction
inactive = self.direction == "inactive"
self.on_hold_by_local = was_on_hold_by_local if inactive else direction == "sendonly"
self.on_hold_by_remote = "send" not in direction
if (is_initial or was_on_hold_by_local or was_inactive) and not inactive and not self.on_hold_by_local and self._hold_request != 'hold':
self.bridge.add(self)
if not was_on_hold_by_local and self.on_hold_by_local:
self.notification_center.post_notification('AudioStreamDidChangeHoldState', sender=self, data=NotificationData(originator="local", on_hold=True))
if was_on_hold_by_local and not self.on_hold_by_local:
self.notification_center.post_notification('AudioStreamDidChangeHoldState', sender=self, data=NotificationData(originator="local", on_hold=False))
if not was_on_hold_by_remote and self.on_hold_by_remote:
self.notification_center.post_notification('AudioStreamDidChangeHoldState', sender=self, data=NotificationData(originator="remote", on_hold=True))
if was_on_hold_by_remote and not self.on_hold_by_remote:
self.notification_center.post_notification('AudioStreamDidChangeHoldState', sender=self, data=NotificationData(originator="remote", on_hold=False))

File Metadata

Mime Type
text/x-diff
Expires
Sat, Dec 28, 4:41 PM (1 d, 3 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3453293
Default Alt Text
(58 KB)

Event Timeline