Page MenuHomePhabricator

session.py
No OneTemporary

session.py

from thread import allocate_lock
from datetime import datetime
from zope.interface import implements
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python.util import Singleton
from application.system import default_host_ip
from pypjua.engine import Engine
from pypjua.core import Invitation, SDPSession, SDPMedia, SDPConnection, RTPTransport, AudioTransport
class TimestampedNotificationData(NotificationData):
def __init__(self, **kwargs):
self.timestamp = datetime.now()
NotificationData.__init__(self, **kwargs)
class Session(object):
"""Represents a session.
Attributes:
state: The state of the object as a string
remote_user_agent: The user agent of the remote party, once detected
rtp_options: the RTPTransport options fetched from the SessionManager
at object creation."""
def __init__(self):
"""Instatiates a new Session object for an incoming or outgoing
session. Initially the object is in the NULL state."""
self.session_manager = SessionManager()
self.rtp_options = self.session_manager.rtp_config.__dict__.copy()
self.state = "NULL"
self.remote_user_agent = None
self._lock = allocate_lock()
self._inv = None
self._audio_sdp_index = -1
self._audio_transport = None
# user interface
def new(self, callee_uri, credentials, route=None, use_audio=False):
"""Creates a new session to the callee with the requested stream(s).
Moves the object from the NULL into the CALLING state."""
self._lock.acquire()
try:
if self.state != "NULL":
raise RuntimeError("This method can only be called while in the NULL state")
if not any([use_audio]):
raise RuntimeError("No media stream requested")
sdp_index = 0
local_address = self.rtp_options["local_rtp_address"]
local_sdp = SDPSession(local_address, connection=SDPConnection(local_address))
if use_audio:
self._audio_sdp_index = sdp_index
sdp_index += 1
local_sdp.media.append(self._init_audio())
self._inv = Invitation(credentials, callee_uri, route=route)
self._inv.set_offered_local_sdp(local_sdp)
self.session_manager.session_mapping[self._inv] = self
self._inv.send_invite()
self.state = "CALLING"
except:
self._stop_media()
self._audio_sdp_index = -1
raise
finally:
self._lock.release()
def accept(self, use_audio=False):
"""Accept an incoming session, using the requested stream(s).
Moves the object from the INCOMING to the ACCEPTING state."""
self._lock.acquire()
try:
if self.state != "INCOMING":
raise RuntimeError("This method can only be called while in the INCOMING state")
remote_sdp = self._inv.get_offered_remote_sdp()
local_address = self.rtp_options["local_rtp_address"]
local_sdp = SDPSession(local_address, connection=SDPConnection(local_address), media=len(remote_sdp.media)*[None])
sdp_media_todo = range(len(remote_sdp.media))
if use_audio:
for audio_sdp_index, sdp_media in enumerate(remote_sdp.media):
if sdp_media.media == "audio":
sdp_media_todo.remove(audio_sdp_index)
self._audio_sdp_index = audio_sdp_index
local_sdp.media[audio_sdp_index] = (self._init_audio(remote_sdp))
break
if self._audio_sdp_index == -1:
raise RuntimeError("Use of audio requested, but audio was not proposed by remote party")
if len(sdp_media_todo) == len(remote_sdp.media):
raise RuntimeError("None of the streams proposed by the remote party was accepted")
for reject_media_index in sdp_media_todo:
remote_media = remote_sdp.media[reject_media_index]
local_sdp.media[reject_media_index] = SDPMedia(remote_media.media, 0, remote_media.transport, formats=remote_media.formats, attributes=remote_media.attributes)
self._inv.set_offered_local_sdp(local_sdp)
self._inv.accept_invite()
self.state = "ACCEPTING"
except:
self._stop_media()
self._audio_sdp_index = -1
raise
finally:
self._lock.release()
def reject(self):
"""Rejects an incoming session. Moves the object from the INCOMING to
the TERMINATING state."""
if self.state != "INCOMING":
raise RuntimeError("This method can only be called while in the INCOMING state")
self.terminate()
def add_audio(self):
"""Add an audio stream to an already established session."""
self._lock.acquire()
try:
if self.state != "ESTABLISHED":
raise RuntimeError("This method can only be called while in the ESTABLISHED state")
if self._audio_transport is not None:
raise RuntimeError("An audio stream is already active whithin this session")
# TODO: implement
finally:
self._lock.release()
def accept_proposal(self):
"""Accept a proposal of stream(s) being added. Moves the object from
the PROPOSED state to the ESTABLISHED state."""
self._lock.acquire()
try:
if self.state != "PROPOSED":
raise RuntimeError("This method can only be called while in the PROPOSED state")
# TODO: implement
finally:
self._lock.release()
def reject_proposal(self):
"""Reject a proposal of stream(s) being added. Moves the object from
the PROPOSED state to the ESTABLISHED state."""
self._lock.acquire()
try:
if self.state != "PROPOSED":
raise RuntimeError("This method can only be called while in the PROPOSED state")
# TODO: implement
finally:
self._lock.release()
def place_on_hold(self):
"""Put an established session on hold. This moves the object from the
ESTABLISHED state to the ONHOLD state."""
self._lock.acquire()
try:
if self.state != "ESTABLISHED":
raise RuntimeError("This method can only be called while in the ESTABLISHED state")
finally:
self._lock.release()
def take_out_of_hold(self):
"""Takes a session that was previous put on hold out of hold. This
moves the object from the ONHOLD state to the ESTABLISHED state."""
self._lock.acquire()
try:
if self.state != "ONHOLD":
raise RuntimeError("This method can only be called while in the ONHOLD state")
finally:
self._lock.release()
def terminate(self):
"""Terminates the session from whatever state it is in.
Moves the object to the TERMINATING state."""
self._lock.acquire()
try:
if self.state in ["NULL", "TERMINATING", "TERMINATED"]:
raise RuntimeError("This method cannot be called while in the NULL or TERMINATED states")
if self._inv.state != "DISCONNECTING":
self._inv.disconnect()
self.state = "TERMINATING"
finally:
self._lock.release()
def _init_audio(self, remote_sdp=None):
"""Initialize everything needed for an audio stream and return a
SDPMedia object describing it. Called internally."""
rtp_transport = RTPTransport(**self.rtp_options)
if remote_sdp is None:
self._audio_transport = AudioTransport(rtp_transport)
else:
self._audio_transport = AudioTransport(rtp_transport, remote_sdp, self._audio_sdp_index)
return self._audio_transport.get_local_media(remote_sdp is None)
def _update_media(self, local_sdp, remote_sdp):
"""Update the media stream(s) according to the newly negotiated SDP.
This will start, stop or change the stream(s). Called by
SessionManager."""
if self._audio_transport:
if local_sdp.media[self._audio_sdp_index].port and remote_sdp.media[self._audio_sdp_index].port:
self._update_audio(local_sdp, remote_sdp)
else:
self._stop_audio()
def _update_audio(self, local_sdp, remote_sdp):
"""Update the audio stream. Will be called locally from
_update_media()."""
if self._audio_transport.is_active:
pass
else:
self._audio_transport.start(local_sdp, remote_sdp, self._audio_sdp_index)
Engine().connect_audio_transport(self._audio_transport)
def _stop_media(self):
"""Stop all media streams. This will be called by SessionManager when
the session ends."""
if self._audio_transport:
self._stop_audio()
def _stop_audio(self):
"""Stop the audio stream. This will be called locally, either from
_update_media() or _stop_media()."""
if self._audio_transport.is_active:
Engine().disconnect_audio_transport(self._audio_transport)
self._audio_transport.stop()
self._audio_transport = None
def _cancel_media(self):
if self._audio_transport is not None and not self._audio_transport.is_active:
self._stop_audio()
class RTPConfiguration(object):
def __init__(self, local_rtp_address=default_host_ip, use_srtp=False, srtp_forced=False, use_ice=False, ice_stun_address=None, ice_stun_port=3478, *args, **kwargs):
self.local_rtp_address = local_rtp_address
self.use_srtp = use_srtp
self.srtp_forced = srtp_forced
self.use_ice = use_ice
self.ice_stun_address = ice_stun_address
self.ice_stun_port = ice_stun_port
class SessionManager(object):
"""The one and only SessionManager, a singleton.
The application needs to create this and then pass its handle_event
method to the Engine as event_handler.
Attributes:
rtp_config: RTPConfiguration object
session_mapping: A dictionary mapping Invitation objects to Session
objects."""
__metaclass__ = Singleton
implements(IObserver)
def __init__(self):
"""Creates a new SessionManager object."""
self.rtp_config = RTPConfiguration()
self.session_mapping = {}
self.notification_center = NotificationCenter()
self.notification_center.add_observer(self, "SCInvitationChangedState")
self.notification_center.add_observer(self, "SCInvitationGotSDPUpdate")
def handle_notification(self, notification):
"""Catches the SCInvitationChangedState and SCInvitationGotSDPUpdate
notifications and takes the appropriate action on the associated
Session object. If needed, it will also post a notification related
to the Session for consumption by the application."""
data = notification.data
inv = notification.sender
session = self.session_mapping.get(inv, None)
if notification.name == "SCInvitationChangedState":
if data.state == "INCOMING":
remote_media = [media.media for media in inv.get_offered_remote_sdp().media]
# TODO: check if the To header/request URI is one of ours
if not any(supported_media in remote_media for supported_media in ["audio"]):
inv.disconnect(415)
else:
inv.respond_to_invite_provisionally(180)
session = Session()
session.state = "INCOMING"
session._inv = inv
session.remote_user_agent = data.headers.get("User-Agent", None)
self.session_mapping[inv] = session
self.notification_center.post_notification("SCSessionIsIncoming", session, TimestampedNotificationData(audio_proposed="audio" in remote_media))
else:
if session is None:
return
session._lock.acquire()
try:
prev_session_state = session.state
if data.prev_state == "CALLING" and data.state == "EARLY":
session.state = "RINGING"
elif data.state == "CONNECTING" and inv.is_outgoing:
session.remote_user_agent = data.headers.get("Server", None)
if session.remote_user_agent is None:
session.remote_user_agent = data.headers.get("User-Agent", None)
elif data.state == "CONFIRMED":
session.state = "ESTABLISHED"
elif data.state == "REINVITED":
# TODO: implement
inv.respond_to_reinvite(488)
elif data.state == "DISCONNECTED":
del self.session_mapping[inv]
session.state = "TERMINATED"
if hasattr(data, "headers"):
if session.remote_user_agent is None:
session.remote_user_agent = data.headers.get("Server", None)
if session.remote_user_agent is None:
session.remote_user_agent = data.headers.get("User-Agent", None)
session._stop_media()
session._inv = None
finally:
session._lock.release()
if prev_session_state != session.state:
self.notification_center.post_notification("SCSessionChangedState", session, TimestampedNotificationData(prev_state=prev_session_state, state=session.state))
elif notification.name == "SCInvitationGotSDPUpdate":
if session is None:
return
session._lock.acquire()
try:
if data.succeeded:
session._update_media(data.local_sdp, data.remote_sdp)
else:
session._cancel_media()
finally:
session._lock.release()

File Metadata

Mime Type
text/x-python
Expires
Tue, Nov 26, 5:16 AM (1 d, 10 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3414190
Default Alt Text
session.py (14 KB)

Event Timeline