Page MenuHomePhabricator

No OneTemporary

diff --git a/sipsimple/core/__init__.py b/sipsimple/core/__init__.py
index 00732a96..99b34fe4 100644
--- a/sipsimple/core/__init__.py
+++ b/sipsimple/core/__init__.py
@@ -1,13 +1,13 @@
# Copyright (C) 2010 AG Projects. See LICENSE for details.
#
from sipsimple.core._core import *
from sipsimple.core._engine import *
from sipsimple.core._primitives import *
-required_revision = 102
+required_revision = 103
if CORE_REVISION != required_revision:
raise ImportError("Wrong SIP core revision %d (expected %d)" % (CORE_REVISION, required_revision))
del required_revision
diff --git a/sipsimple/core/_core.invitation.pxi b/sipsimple/core/_core.invitation.pxi
index 3bf18bfe..272b4961 100644
--- a/sipsimple/core/_core.invitation.pxi
+++ b/sipsimple/core/_core.invitation.pxi
@@ -1,920 +1,922 @@
# Copyright (C) 2008-2010 AG Projects. See LICENSE for details.
#
# python imports
import weakref
from errno import EADDRNOTAVAIL
# classes
cdef class SDPPayloads:
def __init__(self):
self.proposed_local = None
self.proposed_remote = None
self.active_local = None
self.active_remote = None
cdef class StateCallbackTimer(Timer):
def __init__(self, state, sub_state, rdata, tdata):
self.state = state
self.sub_state = sub_state
self.rdata = rdata
self.tdata = tdata
cdef class SDPCallbackTimer(Timer):
def __init__(self, int status):
self.status = status
cdef class Invitation:
def __cinit__(self, *args, **kwargs):
self.weakref = weakref.ref(self)
Py_INCREF(self.weakref)
pj_list_init(<pj_list *> &self._route_set)
pj_mutex_create_recursive(_get_ua()._pjsip_endpoint._pool, "invitation_lock", &self._lock)
self._invite_session = NULL
self._dialog = NULL
self._reinvite_transaction = NULL
self._sdp_neg_status = -1
self._timer = None
self.from_header = None
self.to_header = None
self.route_header = None
self.local_contact_header = None
self.credentials = None
self.sdp = SDPPayloads()
self.remote_user_agent = None
self.state = None
self.sub_state = None
self.transport = None
self.direction = None
self.call_id = None
cdef int init_incoming(self, PJSIPUA ua, pjsip_rx_data *rdata, unsigned int inv_options) except -1:
cdef int status
cdef pj_mutex_t *lock = self._lock
cdef pjmedia_sdp_session_ptr_const sdp
cdef pjsip_dialog **dialog_address
cdef pjsip_inv_session **invite_session_address
cdef pjsip_tpselector tp_sel
cdef pjsip_tx_data *tdata
cdef PJSTR contact_header
with nogil:
status = pj_mutex_lock(lock)
if status != 0:
raise PJSIPError("failed to acquire lock", status)
try:
dialog_address = &self._dialog
invite_session_address = &self._invite_session
self.direction = "incoming"
self.transport = rdata.tp_info.transport.type_name.lower()
request_uri = FrozenSIPURI_create(<pjsip_sip_uri *> pjsip_uri_get_uri(rdata.msg_info.msg.line.req.uri))
if _is_valid_ip(pj_AF_INET(), request_uri.host):
self.local_contact_header = FrozenContactHeader(request_uri)
else:
self.local_contact_header = FrozenContactHeader(FrozenSIPURI(host=_pj_str_to_str(rdata.tp_info.transport.local_name.host),
user=request_uri.user, port=rdata.tp_info.transport.local_name.port,
parameters=(frozendict(transport=self.transport) if self.transport != "udp" else frozendict())))
contact_header = PJSTR(self.local_contact_header.body)
with nogil:
status = pjsip_dlg_create_uas(pjsip_ua_instance(), rdata, &contact_header.pj_str, dialog_address)
if status != 0:
raise PJSIPError("Could not create dialog for new INVITE session", status)
with nogil:
status = pjsip_inv_create_uas(dialog_address[0], rdata, NULL, inv_options, invite_session_address)
if status != 0:
raise PJSIPError("Could not create new INVITE session", status)
tp_sel.type = PJSIP_TPSELECTOR_TRANSPORT
tp_sel.u.transport = rdata.tp_info.transport
with nogil:
status = pjsip_dlg_set_transport(dialog_address[0], &tp_sel)
if status != 0:
raise PJSIPError("Could not set transport for INVITE session", status)
with nogil:
status = pjsip_inv_initial_answer(invite_session_address[0], rdata, 100, NULL, NULL, &tdata)
if status != 0:
raise PJSIPError("Could not create initial (unused) response to INVITE", status)
with nogil:
pjsip_tx_data_dec_ref(tdata)
if self._invite_session.neg != NULL:
if pjmedia_sdp_neg_get_state(self._invite_session.neg) == PJMEDIA_SDP_NEG_STATE_REMOTE_OFFER:
pjmedia_sdp_neg_get_neg_remote(self._invite_session.neg, &sdp)
self.sdp.proposed_remote = FrozenSDPSession_create(sdp)
self._invite_session.mod_data[ua._module.id] = <void *> self.weakref
self.call_id = _pj_str_to_str(self._dialog.call_id.id)
event_dict = dict(obj=self, prev_state=self.state, state="incoming", originator="remote")
_pjsip_msg_to_dict(rdata.msg_info.msg, event_dict)
self.state = "incoming"
self.remote_user_agent = event_dict['headers']['User-Agent'].body if 'User-Agent' in event_dict['headers'] else None
_add_event("SIPInvitationChangedState", event_dict)
self.from_header = FrozenFromHeader_create(rdata.msg_info.from_hdr)
self.to_header = FrozenToHeader_create(rdata.msg_info.to_hdr)
except:
if self._invite_session != NULL:
with nogil:
pjsip_inv_terminate(invite_session_address[0], 500, 0)
self._invite_session = NULL
elif self._dialog != NULL:
with nogil:
pjsip_dlg_terminate(dialog_address[0])
self._dialog = NULL
raise
finally:
with nogil:
pj_mutex_unlock(lock)
return 0
def send_invite(self, FromHeader from_header not None, ToHeader to_header not None, RouteHeader route_header not None, ContactHeader contact_header not None,
SDPSession sdp not None, Credentials credentials=None, list extra_headers not None=list(), timeout=None):
cdef int status
cdef pj_mutex_t *lock = self._lock
cdef pjmedia_sdp_session *local_sdp
cdef pjsip_cred_info *cred_info
cdef pjsip_dialog **dialog_address
cdef pjsip_inv_session **invite_session_address
cdef pjsip_route_hdr *route_set
cdef pjsip_tx_data *tdata
cdef PJSIPUA ua
cdef PJSTR contact_header_str
cdef PJSTR from_header_str
cdef PJSTR to_header_str
cdef PJSTR target_str
ua = _get_ua()
with nogil:
status = pj_mutex_lock(lock)
if status != 0:
raise PJSIPError("failed to acquire lock", status)
try:
dialog_address = &self._dialog
invite_session_address = &self._invite_session
route_set = <pjsip_route_hdr *> &self._route_set
if self.state != None:
raise SIPCoreInvalidStateError('Can only transition to the "outgoing" state from the "None" state, currently in the "%s" state' % self.state)
if timeout is not None and timeout <= 0:
raise ValueError("Timeout value must be positive")
self.transport = route_header.uri.parameters.get("transport", "udp")
self.direction = "outgoing"
self.credentials = FrozenCredentials.new(credentials) if credentials is not None else None
self.route_header = FrozenRouteHeader.new(route_header)
self.route_header.uri.parameters.dict["lr"] = None # always send lr parameter in Route header
self.route_header.uri.parameters.dict["hide"] = None # always hide Route header
self.local_contact_header = FrozenContactHeader.new(contact_header)
self.sdp.proposed_local = FrozenSDPSession.new(sdp) if sdp is not None else None
from_header_str = PJSTR(from_header.body)
to_header_str = PJSTR(to_header.body)
contact_header_str = PJSTR(self.local_contact_header.body)
target_uri = SIPURI.new(to_header.uri)
if target_uri.parameters.get("transport", "udp").lower() != self.transport:
target_uri.parameters["transport"] = self.transport
target_str = PJSTR(str(target_uri))
with nogil:
status = pjsip_dlg_create_uac(pjsip_ua_instance(), &from_header_str.pj_str, &contact_header_str.pj_str,
&to_header_str.pj_str, &target_str.pj_str, dialog_address)
if status != 0:
raise PJSIPError("Could not create dialog for outgoing INVITE session", status)
with nogil:
pjsip_dlg_inc_lock(self._dialog)
self.from_header = FrozenFromHeader_create(self._dialog.local.info)
self.to_header = FrozenToHeader.new(to_header)
self.call_id = _pj_str_to_str(self._dialog.call_id.id)
local_sdp = self.sdp.proposed_local.get_sdp_session() if sdp is not None else NULL
with nogil:
status = pjsip_inv_create_uac(dialog_address[0], local_sdp, 0, invite_session_address)
if status != 0:
raise PJSIPError("Could not create outgoing INVITE session", status)
self._invite_session.mod_data[ua._module.id] = <void *> self.weakref
if self.credentials is not None:
cred_info = self.credentials.get_cred_info()
with nogil:
status = pjsip_auth_clt_set_credentials(&dialog_address[0].auth_sess, 1, cred_info)
if status != 0:
raise PJSIPError("Could not set credentials for INVITE session", status)
_BaseRouteHeader_to_pjsip_route_hdr(self.route_header, &self._route_header, self._dialog.pool)
pj_list_insert_after(<pj_list *> &self._route_set, <pj_list *> &self._route_header)
with nogil:
status = pjsip_dlg_set_route_set(dialog_address[0], route_set)
if status != 0:
raise PJSIPError("Could not set route for INVITE session", status)
with nogil:
status = pjsip_inv_invite(invite_session_address[0], &tdata)
if status != 0:
raise PJSIPError("Could not create INVITE message", status)
_add_headers_to_tdata(tdata, extra_headers)
with nogil:
status = pjsip_inv_send_msg(invite_session_address[0], tdata)
if status != 0:
raise PJSIPError("Could not send initial INVITE", status)
if timeout is not None:
self._timer = Timer()
self._timer.schedule(timeout, <timer_callback>self._cb_timer_disconnect, self)
with nogil:
pjsip_dlg_dec_lock(self._dialog)
except Exception, e:
if isinstance(e, PJSIPError) and e.errno == EADDRNOTAVAIL:
self._invite_session = NULL
pjsip_dlg_dec_lock(self._dialog)
self._dialog = NULL
raise
if self._invite_session != NULL:
pjsip_inv_terminate(self._invite_session, 500, 0)
self._invite_session = NULL
elif self._dialog != NULL:
pjsip_dlg_dec_lock(self._dialog)
self._dialog = NULL
raise
finally:
with nogil:
pj_mutex_unlock(lock)
def send_response(self, int code, str reason=None, BaseContactHeader contact_header=None, BaseSDPSession sdp=None, list extra_headers not None=list()):
cdef int status
cdef pj_mutex_t *lock = self._lock
cdef pj_str_t reason_str
cdef pjmedia_sdp_session *local_sdp
cdef pjsip_inv_session *invite_session
cdef pjsip_tx_data *tdata
cdef PJSIPUA ua
ua = _get_ua()
with nogil:
status = pj_mutex_lock(lock)
if status != 0:
raise PJSIPError("failed to acquire lock", status)
try:
invite_session = self._invite_session
if reason is not None:
_str_to_pj_str(reason, &reason_str)
if self.state not in ("incoming", "early", "connected"):
raise SIPCoreInvalidStateError('Can only send response from the "incoming", "early" and "connected" states current in the "%s" state.' % self.state)
if self.state == "early" and self.direction != "incoming":
raise SIPCoreInvalidStateError('Cannot send response in the "early" state for an outgoing INVITE')
if self.state == "connected" and self.sub_state != "received_proposal":
raise SIPCoreInvalidStateError('Cannot send response in the "connected" state if a proposal has not been received')
if contact_header is not None:
self._update_contact_header(contact_header)
if 200 <= code < 300 and sdp is None:
raise SIPCoreError("Local SDP needs to be set for a positive response")
if code >= 300 and sdp is not None:
raise SIPCoreError("Local SDP cannot be specified for a negative response")
self.sdp.proposed_local = FrozenSDPSession.new(sdp) if sdp is not None else None
local_sdp = self.sdp.proposed_local.get_sdp_session() if sdp is not None else NULL
with nogil:
status = pjsip_inv_answer(invite_session, code, &reason_str if reason is not None else NULL,
local_sdp, &tdata)
if status != 0:
raise PJSIPError("Could not create %d reply to INVITE" % code, status)
_add_headers_to_tdata(tdata, extra_headers)
with nogil:
status = pjsip_inv_send_msg(invite_session, tdata)
if status != 0:
raise PJSIPError("Could not send %d response" % code, status)
finally:
with nogil:
pj_mutex_unlock(lock)
def send_reinvite(self, BaseContactHeader contact_header=None, BaseSDPSession sdp=None, list extra_headers not None=list()):
cdef int status
cdef pj_mutex_t *lock = self._lock
cdef pjmedia_sdp_session *local_sdp
cdef pjsip_inv_session *invite_session
cdef pjsip_tx_data *tdata
cdef PJSIPUA ua
ua = _get_ua()
with nogil:
status = pj_mutex_lock(lock)
if status != 0:
raise PJSIPError("failed to acquire lock", status)
try:
invite_session = self._invite_session
if self.state != "connected":
raise SIPCoreError('Can only send re-INVITE in "connected" state, not "%s" state' % self.state)
if self.sub_state != "normal":
raise SIPCoreError('Can only send re-INVITE if no another re-INVITE transaction is active')
if contact_header is not None:
self._update_contact_header(contact_header)
self.sdp.proposed_local = FrozenSDPSession.new(sdp) if sdp is not None else self.sdp.active_local
local_sdp = self.sdp.proposed_local.get_sdp_session()
with nogil:
status = pjsip_inv_reinvite(invite_session, NULL, local_sdp, &tdata)
if status != 0:
raise PJSIPError("Could not create re-INVITE message", status)
_add_headers_to_tdata(tdata, extra_headers)
with nogil:
status = pjsip_inv_send_msg(invite_session, tdata)
if status != 0:
raise PJSIPError("Could not send re-INVITE", status)
self._reinvite_transaction = self._invite_session.invite_tsx
self.sub_state = "sent_proposal"
event_dict = dict(obj=self, prev_state="connected", state="connected", prev_sub_state="normal", sub_state="sent_proposal", originator="local")
_pjsip_msg_to_dict(tdata.msg, event_dict)
_add_event("SIPInvitationChangedState", event_dict)
finally:
with nogil:
pj_mutex_unlock(lock)
def cancel_reinvite(self):
cdef int status
cdef pj_mutex_t *lock = self._lock
cdef pjsip_inv_session *invite_session
cdef pjsip_tx_data *tdata
cdef PJSIPUA ua
ua = _get_ua()
with nogil:
status = pj_mutex_lock(lock)
if status != 0:
raise PJSIPError("failed to acquire lock", status)
try:
invite_session = self._invite_session
if not self.sub_state == "sent_proposal":
raise SIPCoreError("re-INVITE can only be cancelled if INVITE session is in 'sent_proposal' sub state")
if self._invite_session == NULL:
raise SIPCoreError("INVITE session is not active")
if self._reinvite_transaction == NULL:
raise SIPCoreError("there is no active re-INVITE transaction")
with nogil:
status = pjsip_inv_cancel_reinvite(invite_session, &tdata)
if status != 0:
raise PJSIPError("Could not create message to CANCEL re-INVITE transaction", status)
if tdata != NULL:
with nogil:
status = pjsip_inv_send_msg(invite_session, tdata)
if status != 0:
raise PJSIPError("Could not send %s" % _pj_str_to_str(tdata.msg.line.req.method.name), status)
finally:
with nogil:
pj_mutex_unlock(lock)
def end(self, list extra_headers not None=list(), timeout=None):
cdef int status
cdef pj_mutex_t *lock = self._lock
cdef pjsip_inv_session *invite_session
cdef pjsip_tx_data *tdata
cdef PJSIPUA ua
ua = _get_ua()
with nogil:
status = pj_mutex_lock(lock)
if status != 0:
raise PJSIPError("failed to acquire lock", status)
try:
invite_session = self._invite_session
if self.state == "disconnected":
return
if self.state == "disconnecting":
raise SIPCoreError('INVITE session is already in the "disconnecting" state')
if self._invite_session == NULL:
raise SIPCoreError("INVITE session is not active")
if self.state not in ("outgoing", "early", "connecting", "connected"):
raise SIPCoreError('Can only end the INVITE dialog from the "outgoing", "early", "connecting" and "connected" states' +
'current in the "%s" state.' % self.state)
if self.state == "early" and self.direction != "outgoing":
raise SIPCoreError('Cannot end incoming INVITE dialog while in the "early" state')
if timeout is not None and timeout <= 0:
raise ValueError("Timeout value cannot be negative")
with nogil:
status = pjsip_inv_end_session(invite_session, 0, NULL, &tdata)
if status != 0:
raise PJSIPError("Could not create message to end INVITE session", status)
if tdata != NULL:
_add_headers_to_tdata(tdata, extra_headers)
with nogil:
status = pjsip_inv_send_msg(invite_session, tdata)
if status != 0:
raise PJSIPError("Could not send %s" % _pj_str_to_str(tdata.msg.line.req.method.name), status)
if self._timer is not None:
self._timer.cancel()
self._timer = None
if timeout is not None and timeout > 0:
self._timer = Timer()
self._timer.schedule(timeout, <timer_callback>self._cb_timer_disconnect, self)
event_dict = dict(obj=self, prev_state=self.state, state="disconnecting", originator="local")
if self.state == "connected":
event_dict["prev_sub_state"] = self.sub_state
self.state = "disconnecting"
self.sub_state = None
if tdata != NULL:
_pjsip_msg_to_dict(tdata.msg, event_dict)
_add_event("SIPInvitationChangedState", event_dict)
finally:
with nogil:
pj_mutex_unlock(lock)
property local_identity:
def __get__(self):
if self.direction == 'outgoing':
return self.from_header
elif self.direction == 'incoming':
return self.to_header
else:
return None
property remote_identity:
def __get__(self):
if self.direction == 'incoming':
return self.from_header
elif self.direction == 'outgoing':
return self.to_header
else:
return None
cdef PJSIPUA _check_ua(self):
try:
return _get_ua()
except:
self.state = "disconnected"
self.sub_state = None
self._dialog = NULL
self._invite_session = NULL
self._reinvite_transaction = NULL
cdef int _do_dealloc(self) except -1:
cdef int status
cdef pj_mutex_t *lock = self._lock
cdef pjsip_inv_session *invite_session
cdef PJSIPUA ua
try:
ua = _get_ua()
except SIPCoreError:
return 0
with nogil:
status = pj_mutex_lock(lock)
if status != 0:
raise PJSIPError("failed to acquire lock", status)
try:
invite_session = self._invite_session
if self._invite_session != NULL:
self._invite_session.mod_data[ua._module.id] = NULL
if self.state != "disconnecting":
with nogil:
pjsip_inv_terminate(invite_session, 481, 0)
self._dialog = NULL
self._invite_session = NULL
self._reinvite_transaction = NULL
if self._timer is not None:
self._timer.cancel()
self._timer = None
finally:
with nogil:
pj_mutex_unlock(lock)
return 0
def __dealloc__(self):
cdef Timer timer
self._do_dealloc()
pj_mutex_destroy(self._lock)
timer = Timer()
try:
timer.schedule(60, deallocate_weakref, self.weakref)
except SIPCoreError:
pass
cdef int _update_contact_header(self, BaseContactHeader contact_header) except -1:
# The PJSIP functions called here don't do much, so there is no need to call them
# without the gil.
cdef pj_str_t contact_str_pj
cdef pjsip_uri *contact
contact_str = str(contact_header.uri)
if contact_header.display_name:
contact_str = "%s <%s>" % (contact_header.display_name, contact_str)
pj_strdup2_with_null(self._dialog.pool, &contact_str_pj, contact_str)
contact = pjsip_parse_uri(self._dialog.pool, contact_str_pj.ptr, contact_str_pj.slen, PJSIP_PARSE_URI_AS_NAMEADDR)
if contact == NULL:
raise SIPCoreError("Not a valid Contact header: %s" % contact_str)
self._dialog.local.contact = pjsip_contact_hdr_create(self._dialog.pool)
self._dialog.local.contact.uri = contact
if contact_header.expires is not None:
self._dialog.local.contact.expires = contact_header.expires
if contact_header.q is not None:
self._dialog.local.contact.q1000 = int(contact_header.q*1000)
parameters = contact_header.parameters.copy()
parameters.pop("q", None)
parameters.pop("expires", None)
_dict_to_pjsip_param(contact_header.parameters, &self._dialog.local.contact.other_param, self._dialog.pool)
self.local_contact_header = FrozenContactHeader.new(contact_header)
return 0
cdef int _fail(self, PJSIPUA ua) except -1:
cdef Timer timer
ua._handle_exception(0)
self._invite_session.mod_data[ua._module.id] = NULL
if self.state != "disconnected":
event_dict = dict(obj=self, prev_state=self.state, state="disconnected", originator="local", disconnect_reason="internal exception occured")
if self.state == "connected":
event_dict["prev_sub_state"] = self.sub_state
self.state = "disconnected"
self.sub_state = None
_add_event("SIPInvitationChangedState", event_dict)
# calling do_dealloc from within a callback makes PJSIP crash
# the handler will be executed after pjsip_endpt_handle_events returns
timer = Timer()
timer.schedule(0, <timer_callback>self._cb_postpoll_fail, self)
return 0
cdef int _cb_state(self, StateCallbackTimer timer) except -1:
cdef int status
+ cdef bint pjsip_error = False
cdef pj_mutex_t *lock = self._lock
cdef pjmedia_sdp_session_ptr_const sdp
cdef pjsip_inv_session *invite_session
cdef object state
cdef object sub_state
cdef object rdata
cdef object tdata
cdef PJSIPUA ua
ua = self._check_ua()
if ua is None:
return 0
with nogil:
status = pj_mutex_lock(lock)
if status != 0:
raise PJSIPError("failed to acquire lock", status)
try:
invite_session = self._invite_session
state = timer.state
sub_state = timer.sub_state
rdata = timer.rdata
tdata = timer.tdata
if state == self.state and sub_state == self.sub_state:
return 0
if state == "connected":
if self.state == "connecting" and self._sdp_neg_status != 0:
self.end()
return 0
if state == "disconnected" and self.state != "disconnecting":
# the invite session may have been destroyed if it failed
if not self._invite_session:
return 0
# we either sent a cancel or a negative reply to an incoming INVITE
if self._invite_session.cancelling or (self.state in ("incoming", "early") and self.direction == "incoming" and rdata is None):
# we caused the disconnect so send the transition to the disconnecting state
+ pjsip_error = True
event_dict = dict(obj=self, prev_state=self.state, state="disconnecting", originator="local")
self.state = "disconnecting"
_add_event("SIPInvitationChangedState", event_dict)
if self.direction == "outgoing" and state in ('connecting', 'connected') and self.state in ('outgoing', 'early') and rdata is not None:
self.to_header = rdata['headers']['To']
event_dict = dict(obj=self, prev_state=self.state, state=state)
if self.state == "connected":
event_dict["prev_sub_state"] = self.sub_state
if state == "connected":
event_dict["sub_state"] = sub_state
event_dict["originator"] = "remote" if rdata is not None else "local"
if rdata is not None:
event_dict.update(rdata)
if tdata is not None:
event_dict.update(tdata)
if self.remote_user_agent is None and state in ('connecting', 'connected') and rdata is not None:
if 'User-Agent' in event_dict['headers']:
self.remote_user_agent = event_dict['headers']['User-Agent'].body
elif 'Server' in event_dict['headers']:
self.remote_user_agent = event_dict['headers']['Server'].body
if state == "connected":
if sub_state == "received_proposal":
self._reinvite_transaction = self._invite_session.invite_tsx
if pjmedia_sdp_neg_get_state(self._invite_session.neg) == PJMEDIA_SDP_NEG_STATE_REMOTE_OFFER:
pjmedia_sdp_neg_get_neg_remote(self._invite_session.neg, &sdp)
self.sdp.proposed_remote = FrozenSDPSession_create(sdp)
elif sub_state == "sent_proposal":
if pjmedia_sdp_neg_get_state(self._invite_session.neg) == PJMEDIA_SDP_NEG_STATE_LOCAL_OFFER:
pjmedia_sdp_neg_get_neg_local(self._invite_session.neg, &sdp)
self.sdp.proposed_local = FrozenSDPSession_create(sdp)
elif self.sub_state in ("received_proposal", "sent_proposal"):
if (rdata, tdata) == (None, None):
event_dict['code'] = 408
event_dict['reason'] = 'Request Timeout'
if pjmedia_sdp_neg_get_state(self._invite_session.neg) == PJMEDIA_SDP_NEG_STATE_LOCAL_OFFER:
pjmedia_sdp_neg_cancel_offer(self._invite_session.neg)
if pjmedia_sdp_neg_get_state(self._invite_session.neg) == PJMEDIA_SDP_NEG_STATE_REMOTE_OFFER:
pjmedia_sdp_neg_cancel_remote_offer(self._invite_session.neg)
self._reinvite_transaction = NULL
if state == "disconnected":
- event_dict["disconnect_reason"] = "user request"
+ event_dict["disconnect_reason"] = "user request" if not pjsip_error else "internal error"
if not self._invite_session.cancelling and rdata is None and self._invite_session.cause > 0:
# pjsip internally generates 408 and 503
if self._invite_session.cause == 408:
if self.direction == "incoming" and self.state == "connecting":
event_dict["disconnect_reason"] = "missing ACK"
else:
event_dict["disconnect_reason"] = "timeout"
else:
event_dict["disconnect_reason"] = _pj_str_to_str(self._invite_session.cause_text)
elif self._invite_session.cancelling and rdata is None and self._invite_session.cause == 408 and self.state == "disconnecting":
# silly pjsip sets cancelling field when we call pjsip_inv_end_session in end even if we send a BYE
event_dict['code'] = 408
event_dict['reason'] = 'Request Timeout'
elif rdata is not None and 'Reason' in event_dict['headers']:
try:
reason = event_dict['headers']['Reason'].text
if reason:
event_dict["disconnect_reason"] = reason
except (ValueError, IndexError):
pass
self._invite_session.mod_data[ua._module.id] = NULL
self._invite_session = NULL
self._dialog = NULL
if self._timer is not None:
self._timer.cancel()
self._timer = None
elif state in ("early", "connecting") and self._timer is not None:
self._timer.cancel()
self._timer = None
self.state = state
self.sub_state = sub_state
_add_event("SIPInvitationChangedState", event_dict)
finally:
with nogil:
pj_mutex_unlock(lock)
return 0
cdef int _cb_sdp_done(self, SDPCallbackTimer timer) except -1:
cdef int status
cdef pj_mutex_t *lock = self._lock
cdef pjmedia_sdp_session_ptr_const sdp
with nogil:
status = pj_mutex_lock(lock)
if status != 0:
raise PJSIPError("failed to acquire lock", status)
try:
self._sdp_neg_status = status
self.sdp.proposed_local = None
self.sdp.proposed_remote = None
if timer.status == 0:
pjmedia_sdp_neg_get_active_local(self._invite_session.neg, &sdp)
local_sdp = SDPSession_create(sdp)
pjmedia_sdp_neg_get_active_remote(self._invite_session.neg, &sdp)
remote_sdp = SDPSession_create(sdp)
if len(local_sdp.media) > len(remote_sdp.media):
local_sdp.media = local_sdp.media[:len(remote_sdp.media)]
if len(remote_sdp.media) > len(local_sdp.media):
remote_sdp.media = remote_sdp.media[:len(local_sdp.media)]
for index, local_media in enumerate(local_sdp.media):
remote_media = remote_sdp.media[index]
if not local_media.port and remote_media.port:
remote_media.port = 0
if not remote_media.port and local_media.port:
local_media.port = 0
self.sdp.active_local = FrozenSDPSession.new(local_sdp)
self.sdp.active_remote = FrozenSDPSession.new(remote_sdp)
if self.state in ["disconnecting", "disconnected"]:
return 0
event_dict = dict(obj=self, succeeded=timer.status == 0)
if timer.status == 0:
event_dict["local_sdp"] = self.sdp.active_local
event_dict["remote_sdp"] = self.sdp.active_remote
else:
event_dict["error"] = _pj_status_to_str(timer.status)
_add_event("SIPInvitationGotSDPUpdate", event_dict)
if self.state in ("incoming", "early") and timer.status != 0:
if self.direction == "incoming":
self.send_response(488)
else:
self.end()
finally:
with nogil:
pj_mutex_unlock(lock)
return 0
cdef int _cb_timer_disconnect(self, timer) except -1:
cdef pjsip_inv_session *invite_session = self._invite_session
with nogil:
pjsip_inv_terminate(invite_session, 408, 1)
cdef int _cb_postpoll_fail(self, timer) except -1:
self._do_dealloc()
# Callback functions
#
cdef void _Invitation_cb_state(pjsip_inv_session *inv, pjsip_event *e) with gil:
cdef pjsip_rx_data *rdata = NULL
cdef pjsip_tx_data *tdata = NULL
cdef object state
cdef object rdata_dict = None
cdef object tdata_dict = None
cdef Invitation invitation
cdef PJSIPUA ua
cdef StateCallbackTimer timer
try:
ua = _get_ua()
except:
return
try:
if inv.state == PJSIP_INV_STATE_INCOMING:
return
if inv.mod_data[ua._module.id] != NULL:
invitation = (<object> inv.mod_data[ua._module.id])()
if invitation is None:
return
state = pjsip_inv_state_name(inv.state).lower()
sub_state = None
if state == "calling":
state = "outgoing"
elif state == "confirmed":
state = "connected"
sub_state = "normal"
elif state == "disconnctd":
state = "disconnected"
if e != NULL:
if e.type == PJSIP_EVENT_TSX_STATE and e.body.tsx_state.type == PJSIP_EVENT_TX_MSG:
tdata = e.body.tsx_state.src.tdata
if (tdata.msg.type == PJSIP_RESPONSE_MSG and tdata.msg.line.status.code == 487 and
state == "disconnected" and invitation.state in ["incoming", "early"]):
return
elif e.type == PJSIP_EVENT_RX_MSG:
rdata = e.body.rx_msg.rdata
elif e.type == PJSIP_EVENT_TSX_STATE and e.body.tsx_state.type == PJSIP_EVENT_RX_MSG:
if (inv.state != PJSIP_INV_STATE_CONFIRMED or
e.body.tsx_state.src.rdata.msg_info.msg.type == PJSIP_REQUEST_MSG):
rdata = e.body.tsx_state.src.rdata
if rdata != NULL:
rdata_dict = dict()
_pjsip_msg_to_dict(rdata.msg_info.msg, rdata_dict)
if tdata != NULL:
tdata_dict = dict()
_pjsip_msg_to_dict(tdata.msg, tdata_dict)
try:
timer = StateCallbackTimer(state, sub_state, rdata_dict, tdata_dict)
timer.schedule(0, <timer_callback>invitation._cb_state, invitation)
except:
invitation._fail(ua)
except:
ua._handle_exception(1)
cdef void _Invitation_cb_sdp_done(pjsip_inv_session *inv, int status) with gil:
cdef Invitation invitation
cdef PJSIPUA ua
cdef SDPCallbackTimer timer
try:
ua = _get_ua()
except:
return
try:
if inv.mod_data[ua._module.id] != NULL:
invitation = (<object> inv.mod_data[ua._module.id])()
if invitation is None:
return
try:
timer = SDPCallbackTimer(status)
timer.schedule(0, <timer_callback>invitation._cb_sdp_done, invitation)
except:
invitation._fail(ua)
except:
ua._handle_exception(1)
cdef void _Invitation_cb_rx_reinvite(pjsip_inv_session *inv,
pjmedia_sdp_session_ptr_const offer, pjsip_rx_data *rdata) with gil:
cdef int status
cdef pjsip_tx_data *answer_tdata
cdef object rdata_dict = None
cdef Invitation invitation
cdef PJSIPUA ua
cdef StateCallbackTimer timer
try:
ua = _get_ua()
except:
return
try:
if inv.mod_data[ua._module.id] != NULL:
invitation = (<object> inv.mod_data[ua._module.id])()
if invitation is None:
return
if rdata != NULL:
rdata_dict = dict()
_pjsip_msg_to_dict(rdata.msg_info.msg, rdata_dict)
with nogil:
status = pjsip_inv_initial_answer(inv, rdata, 100, NULL, NULL, &answer_tdata)
if status != 0:
raise PJSIPError("Could not create initial (unused) response to re-INVITE", status)
with nogil:
pjsip_tx_data_dec_ref(answer_tdata)
try:
timer = StateCallbackTimer("connected", "received_proposal", rdata_dict, None)
timer.schedule(0, <timer_callback>invitation._cb_state, invitation)
except:
invitation._fail(ua)
except:
ua._handle_exception(1)
cdef void _Invitation_cb_tsx_state_changed(pjsip_inv_session *inv, pjsip_transaction *tsx, pjsip_event *e) with gil:
cdef pjsip_rx_data *rdata = NULL
cdef pjsip_tx_data *tdata = NULL
cdef object rdata_dict = None
cdef object tdata_dict = None
cdef Invitation invitation
cdef PJSIPUA ua
cdef StateCallbackTimer timer
try:
ua = _get_ua()
except:
return
try:
if tsx == NULL or e == NULL:
return
if e.type == PJSIP_EVENT_TSX_STATE and e.body.tsx_state.type == PJSIP_EVENT_RX_MSG:
rdata = e.body.tsx_state.src.rdata
if e.type == PJSIP_EVENT_TSX_STATE and e.body.tsx_state.type == PJSIP_EVENT_TX_MSG:
tdata = e.body.tsx_state.src.tdata
if inv.mod_data[ua._module.id] != NULL:
invitation = (<object> inv.mod_data[ua._module.id])()
if invitation is None:
return
if ((tsx.state == PJSIP_TSX_STATE_TERMINATED or tsx.state == PJSIP_TSX_STATE_COMPLETED) and
invitation._reinvite_transaction != NULL and invitation._reinvite_transaction == tsx):
if rdata != NULL:
rdata_dict = dict()
_pjsip_msg_to_dict(rdata.msg_info.msg, rdata_dict)
if tdata != NULL:
tdata_dict = dict()
_pjsip_msg_to_dict(tdata.msg, tdata_dict)
try:
timer = StateCallbackTimer("connected", "normal", rdata_dict, tdata_dict)
timer.schedule(0, <timer_callback>invitation._cb_state, invitation)
except:
invitation._fail(ua)
elif (invitation.state in ("incoming", "early") and invitation.direction == "incoming" and
rdata != NULL and rdata.msg_info.msg.type == PJSIP_REQUEST_MSG and
rdata.msg_info.msg.line.req.method.id == PJSIP_CANCEL_METHOD):
if rdata != NULL:
rdata_dict = dict()
_pjsip_msg_to_dict(rdata.msg_info.msg, rdata_dict)
try:
timer = StateCallbackTimer("disconnected", None, rdata_dict, None)
timer.schedule(0, <timer_callback>invitation._cb_state, invitation)
except:
invitation._fail(ua)
except:
ua._handle_exception(1)
cdef void _Invitation_cb_new(pjsip_inv_session *inv, pjsip_event *e) with gil:
# As far as I can tell this is never actually called!
pass
# Globals
#
cdef pjsip_inv_callback _inv_cb
_inv_cb.on_state_changed = _Invitation_cb_state
_inv_cb.on_media_update = _Invitation_cb_sdp_done
_inv_cb.on_rx_reinvite = _Invitation_cb_rx_reinvite
_inv_cb.on_tsx_state_changed = _Invitation_cb_tsx_state_changed
_inv_cb.on_new_session = _Invitation_cb_new
diff --git a/sipsimple/core/_core.pyx b/sipsimple/core/_core.pyx
index 057e8969..05b853d3 100644
--- a/sipsimple/core/_core.pyx
+++ b/sipsimple/core/_core.pyx
@@ -1,57 +1,57 @@
# Copyright (C) 2008-2010 AG Projects. See LICENSE for details.
#
# includes
include "_core.error.pxi"
include "_core.lib.pxi"
include "_core.sound.pxi"
include "_core.util.pxi"
include "_core.ua.pxi"
include "_core.event.pxi"
include "_core.request.pxi"
include "_core.helper.pxi"
include "_core.headers.pxi"
include "_core.subscription.pxi"
include "_core.invitation.pxi"
include "_core.sdp.pxi"
include "_core.mediatransport.pxi"
# constants
PJ_VERSION = pj_get_version()
PJ_SVN_REVISION = int(PJ_SVN_REV)
-CORE_REVISION = 102
+CORE_REVISION = 103
# exports
__all__ = ["PJ_VERSION", "PJ_SVN_REVISION", "CORE_REVISION",
"SIPCoreError", "PJSIPError", "PJSIPTLSError", "SIPCoreInvalidStateError",
"AudioMixer", "ToneGenerator", "RecordingWaveFile", "WaveFile", "MixerPort",
"sip_status_messages",
"BaseCredentials", "Credentials", "FrozenCredentials", "BaseSIPURI", "SIPURI", "FrozenSIPURI",
"BaseHeader", "Header", "FrozenHeader", "BaseContactHeader", "ContactHeader", "FrozenContactHeader",
"BaseIdentityHeader", "IdentityHeader", "FrozenIdentityHeader", "FromHeader", "FrozenFromHeader", "ToHeader", "FrozenToHeader",
"RouteHeader", "FrozenRouteHeader", "RecordRouteHeader", "FrozenRecordRouteHeader", "BaseRetryAfterHeader", "RetryAfterHeader", "FrozenRetryAfterHeader",
"BaseViaHeader", "ViaHeader", "FrozenViaHeader", "BaseWarningHeader", "WarningHeader", "FrozenWarningHeader",
"BaseEventHeader", "EventHeader", "FrozenEventHeader", "BaseSubscriptionStateHeader", "SubscriptionStateHeader", "FrozenSubscriptionStateHeader",
"BaseReasonHeader", "ReasonHeader", "FrozenReasonHeader",
"Request",
"Subscription",
"Invitation",
"SDPSession", "FrozenSDPSession", "SDPMediaStream", "FrozenSDPMediaStream", "SDPConnection", "FrozenSDPConnection", "SDPAttribute", "FrozenSDPAttribute",
"RTPTransport", "AudioTransport"]
# Initialize the GIL in the PyMODINIT function of the module.
# This is a hack because Cython does not support #ifdefs.
cdef extern from *:
cdef void emit_ifdef_with_thread "#ifdef WITH_THREAD //" ()
cdef void emit_endif "#endif //" ()
emit_ifdef_with_thread()
PyEval_InitThreads()
emit_endif()
diff --git a/sipsimple/session.py b/sipsimple/session.py
index c64a5fce..c72cb2fb 100644
--- a/sipsimple/session.py
+++ b/sipsimple/session.py
@@ -1,1419 +1,1439 @@
# Copyright (C) 2008-2010 AG Projects. See LICENSE for details.
#
"""
Implements an asynchronous notification based mechanism for
establishment, modification and termination of sessions using Session
Initiation Protocol (SIP) standardized in RFC3261.
"""
from __future__ import absolute_import, with_statement
from datetime import datetime
from threading import RLock
from application.notification import IObserver, Notification, NotificationCenter
from application.python.decorator import decorator, preserve_signature
from application.python.util import Singleton
from application.system import host
from eventlet import api, coros
from eventlet.coros import queue
from zope.interface import implements
from sipsimple.core import Engine, Invitation, PJSIPError, SIPCoreError, SIPCoreInvalidStateError, sip_status_messages
from sipsimple.core import ContactHeader, FromHeader, ReasonHeader, RouteHeader, WarningHeader
from sipsimple.core import SDPConnection, SDPMediaStream, SDPSession
from sipsimple.account import AccountManager
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.streams import MediaStreamRegistry, InvalidStreamError, UnknownStreamError
from sipsimple.util import TimestampedNotificationData, run_in_green_thread, run_in_twisted_thread
class MediaStreamDidFailError(Exception):
def __init__(self, stream, data):
self.stream = stream
self.data = data
class InvitationDidFailError(Exception):
def __init__(self, invitation, data):
self.invitation = invitation
self.data = data
class IllegalStateError(RuntimeError):
pass
@decorator
def transition_state(required_state, new_state):
def state_transitioner(func):
@preserve_signature(func)
def wrapper(obj, *args, **kwargs):
with obj._lock:
if obj.state != required_state:
raise IllegalStateError('cannot call %s in %s state' % (func.__name__, obj.state))
obj.state = new_state
return func(obj, *args, **kwargs)
return wrapper
return state_transitioner
@decorator
def check_state(required_states):
def state_checker(func):
@preserve_signature(func)
def wrapper(obj, *args, **kwargs):
if obj.state not in required_states:
raise IllegalStateError('cannot call %s in %s state' % (func.__name__, obj.state))
return func(obj, *args, **kwargs)
return wrapper
return state_checker
class Session(object):
implements(IObserver)
media_stream_timeout = 15
def __init__(self, account):
self.account = account
self.direction = None
self.end_time = None
self.on_hold = False
self.proposed_streams = None
self.route = None
self.state = None
self.start_time = None
self.streams = None
self.transport = None
self.greenlet = None
self._channel = queue()
self._hold_in_progress = False
self._invitation = None
self._local_identity = None
self._remote_identity = None
self._lock = RLock()
def init_incoming(self, invitation):
notification_center = NotificationCenter()
remote_sdp = invitation.sdp.proposed_remote
self.proposed_streams = []
if remote_sdp:
for index, media_stream in enumerate(remote_sdp.media):
if media_stream.port != 0:
for stream_type in MediaStreamRegistry():
try:
stream = stream_type.new_from_sdp(self.account, remote_sdp, index)
except InvalidStreamError:
break
except UnknownStreamError:
continue
else:
stream.index = index
self.proposed_streams.append(stream)
break
if self.proposed_streams:
self.direction = 'incoming'
self.state = 'incoming'
self.transport = invitation.transport
self._invitation = invitation
notification_center.add_observer(self, sender=invitation)
notification_center.post_notification('SIPSessionNewIncoming', self, TimestampedNotificationData(streams=self.proposed_streams))
else:
invitation.send_response(488)
@transition_state(None, 'connecting')
@run_in_green_thread
def connect(self, to_header, routes, streams):
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
connected = False
received_code = 0
received_reason = None
unhandled_notifications = []
self.direction = 'outgoing'
self.proposed_streams = streams
self.route = routes[0]
self.transport = self.route.transport
self._invitation = Invitation()
self._local_identity = FromHeader(self.account.uri, self.account.display_name)
self._remote_identity = to_header
notification_center.add_observer(self, sender=self._invitation)
notification_center.post_notification('SIPSessionNewOutgoing', self, TimestampedNotificationData(streams=streams))
for stream in self.proposed_streams:
notification_center.add_observer(self, sender=stream)
stream.initialize(self, direction='outgoing')
try:
wait_count = len(self.proposed_streams)
while wait_count > 0:
notification = self._channel.wait()
if notification.name == 'MediaStreamDidInitialize':
wait_count -= 1
local_ip = host.default_ip
local_sdp = SDPSession(local_ip, connection=SDPConnection(local_ip), name=settings.user_agent)
stun_addresses = []
for index, stream in enumerate(self.proposed_streams):
stream.index = index
media = stream.get_local_media(for_offer=True)
local_sdp.media.append(media)
stun_addresses.extend((value.split(' ', 5)[4] for value in media.attributes.getall('candidate') if value.startswith('S ')))
if stun_addresses:
local_sdp.connection.address = stun_addresses[0]
self._invitation.send_invite(FromHeader(self.account.uri, self.account.display_name), to_header, RouteHeader(self.route.get_uri()),
ContactHeader(self.account.contact[self.route.transport]), local_sdp, self.account.credentials)
try:
with api.timeout(settings.sip.invite_timeout):
while True:
notification = self._channel.wait()
if notification.name == 'SIPInvitationGotSDPUpdate':
if notification.data.succeeded:
local_sdp = notification.data.local_sdp
remote_sdp = notification.data.remote_sdp
break
else:
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self._fail(originator='remote', code=received_code, reason=received_reason, error='SDP negotiation failed: %s' % notification.data.error)
return
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'early':
if notification.data.code == 180:
notification_center.post_notification('SIPSessionGotRingIndication', self, TimestampedNotificationData())
notification_center.post_notification('SIPSessionGotProvisionalResponse', self, TimestampedNotificationData(code=notification.data.code, reason=notification.data.reason))
elif notification.data.state == 'connecting':
received_code = notification.data.code
received_reason = notification.data.reason
elif notification.data.state == 'connected':
if not connected:
connected = True
notification_center.post_notification('SIPSessionDidProcessTransaction', self,
TimestampedNotificationData(originator='local', method='INVITE', code=received_code, reason=received_reason))
else:
unhandled_notifications.append(notification)
+ elif notification.data.state == 'disconnected':
+ raise InvitationDidFailError(notification.sender, notification.data)
except api.TimeoutError:
self.greenlet = None
self.end()
return
notification_center.post_notification('SIPSessionWillStart', self, TimestampedNotificationData())
stream_map = dict((stream.index, stream) for stream in self.proposed_streams)
for index, local_media in enumerate(local_sdp.media):
remote_media = remote_sdp.media[index]
stream = stream_map[index]
if remote_media.port:
stream.start(local_sdp, remote_sdp, index)
else:
notification_center.remove_observer(self, sender=stream)
self.proposed_streams.remove(stream)
del stream_map[stream.index]
stream.deactivate()
stream.end()
removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)]
for stream in removed_streams:
notification_center.remove_observer(self, sender=stream)
self.proposed_streams.remove(stream)
del stream_map[stream.index]
stream.deactivate()
stream.end()
invitation_notifications = []
with api.timeout(self.media_stream_timeout):
wait_count = len(self.proposed_streams)
while wait_count > 0:
notification = self._channel.wait()
if notification.name == 'MediaStreamDidStart':
wait_count -= 1
elif notification.name == 'SIPInvitationChangedState':
invitation_notifications.append(notification)
[self._channel.send(notification) for notification in invitation_notifications]
while not connected or self._channel:
notification = self._channel.wait()
if notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'early':
if notification.data.code == 180:
notification_center.post_notification('SIPSessionGotRingIndication', self, TimestampedNotificationData())
notification_center.post_notification('SIPSessionGotProvisionalResponse', self, TimestampedNotificationData(code=notification.data.code, reason=notification.data.reason))
elif notification.data.state == 'connecting':
received_code = notification.data.code
received_reason = notification.data.reason
elif notification.data.state == 'connected':
if not connected:
connected = True
notification_center.post_notification('SIPSessionDidProcessTransaction', self,
TimestampedNotificationData(originator='local', method='INVITE', code=received_code, reason=received_reason))
else:
unhandled_notifications.append(notification)
+ elif notification.data.state == 'disconnected':
+ raise InvitationDidFailError(notification.sender, notification.data)
except (MediaStreamDidFailError, api.TimeoutError), e:
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
if isinstance(e, api.TimeoutError):
error = 'media stream timed out while starting'
else:
error = 'media stream failed: %s' % e.data.reason
self._fail(originator='local', code=received_code, reason=received_reason, error=error)
except InvitationDidFailError, e:
notification_center.remove_observer(self, sender=self._invitation)
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self.state = 'terminated'
# As it weird as it may sound, PJSIP accepts a BYE even without receiving a final response to the INVITE
if e.data.prev_state in ('connecting', 'connected') or getattr(e.data, 'method', None) == 'BYE':
notification_center.post_notification('SIPSessionWillEnd', self, TimestampedNotificationData(originator=e.data.originator))
if e.data.originator == 'remote':
notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator='remote', method=e.data.method, code=200, reason=sip_status_messages[200]))
self.end_time = datetime.now()
notification_center.post_notification('SIPSessionDidEnd', self, TimestampedNotificationData(originator=e.data.originator, end_reason=e.data.disconnect_reason))
else:
if e.data.originator == 'remote':
notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator='local', method='INVITE', code=e.data.code, reason=e.data.reason))
code = e.data.code if e.data.originator == 'remote' else 0
reason = e.data.reason if e.data.originator == 'remote' else None
if e.data.originator == 'remote' and code // 100 == 3:
redirect_identities = e.data.headers.get('Contact', [])
else:
redirect_identities = None
notification_center.post_notification('SIPSessionDidFail', self, TimestampedNotificationData(originator=e.data.originator, code=code, reason=reason, failure_reason=e.data.disconnect_reason, redirect_identities=redirect_identities))
self.greenlet = None
except SIPCoreError, e:
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self._fail(originator='local', code=received_code, reason=received_reason, error='SIP core error: %s' % str(e))
else:
self.greenlet = None
self.state = 'connected'
self.streams = self.proposed_streams
self.proposed_streams = None
self.start_time = datetime.now()
notification_center.post_notification('SIPSessionDidStart', self, TimestampedNotificationData(streams=self.streams))
for notification in unhandled_notifications:
self.handle_notification(notification)
if self._hold_in_progress:
self._send_hold()
@check_state(['incoming', 'received_proposal'])
@run_in_green_thread
def send_ring_indication(self):
try:
self._invitation.send_response(180)
except SIPCoreInvalidStateError:
pass # The INVITE session might have already been cancelled; ignore the error
@transition_state('incoming', 'accepting')
@run_in_green_thread
def accept(self, streams):
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
connected = False
unhandled_notifications = []
if self.proposed_streams:
for stream in self.proposed_streams:
if stream in streams:
notification_center.add_observer(self, sender=stream)
stream.initialize(self, direction='incoming')
else:
for index, stream in enumerate(streams):
notification_center.add_observer(self, sender=stream)
stream.index = index
stream.initialize(self, direction='outgoing')
self.proposed_streams = streams
try:
wait_count = len(self.proposed_streams)
while wait_count > 0:
notification = self._channel.wait()
if notification.name == 'MediaStreamDidInitialize':
wait_count -= 1
local_ip = host.default_ip
local_sdp = SDPSession(local_ip, connection=SDPConnection(local_ip), name=settings.user_agent)
stun_addresses = []
if self._invitation.sdp.proposed_remote:
stream_map = dict((stream.index, stream) for stream in self.proposed_streams)
for index, media in enumerate(self._invitation.sdp.proposed_remote.media):
stream = stream_map.get(index, None)
if stream is not None:
media = stream.get_local_media(for_offer=False)
local_sdp.media.append(media)
stun_addresses.extend((value.split(' ', 5)[4] for value in media.attributes.getall('candidate') if value.startswith('S ')))
else:
media = SDPMediaStream.new(media)
media.port = 0
local_sdp.media.append(media)
else:
for stream in self.proposed_streams:
media = stream.get_local_media(for_offer=True)
local_sdp.media.append(media)
stun_addresses.extend((value.split(' ', 5)[4] for value in media.attributes.getall('candidate') if value.startswith('S ')))
if stun_addresses:
local_sdp.connection.address = stun_addresses[0]
self._invitation.send_response(200, sdp=local_sdp)
notification_center.post_notification('SIPSessionWillStart', self, TimestampedNotificationData())
local_sdp = self._invitation.sdp.active_local
remote_sdp = self._invitation.sdp.active_remote
# while entered only if initial INVITE did not contain SDP and we are waiting for the ACK which should contain it
while remote_sdp is None or local_sdp is None:
notification = self._channel.wait()
if notification.name == 'SIPInvitationGotSDPUpdate':
if notification.data.succeeded:
local_sdp = notification.data.local_sdp
remote_sdp = notification.data.remote_sdp
break
else:
if not connected:
# we could not have got a SIPInvitationGotSDPUpdate if we did not get an ACK
connected = True
notification_center.post_notification('SIPSessionDidProcessTransaction', self,
TimestampedNotificationData(originator='remote', method='INVITE', code=200, reason=sip_status_messages[200], ack_received=True))
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self._fail(originator='remote', code=200, reason=sip_status_messages[200], error='SDP negotiation failed: %s' % notification.data.error)
return
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected':
if not connected:
connected = True
notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator='remote', method='INVITE', code=200, reason=sip_status_messages[200], ack_received=True))
elif notification.data.prev_state == 'connected':
unhandled_notifications.append(notification)
+ elif notification.data.state == 'disconnected':
+ raise InvitationDidFailError(notification.sender, notification.data)
wait_count = 0
stream_map = dict((stream.index, stream) for stream in self.proposed_streams)
for index, local_media in enumerate(local_sdp.media):
remote_media = remote_sdp.media[index]
stream = stream_map.get(index, None)
if stream is not None:
if remote_media.port:
wait_count += 1
stream.start(local_sdp, remote_sdp, index)
else:
notification_center.remove_observer(self, sender=stream)
self.proposed_streams.remove(stream)
del stream_map[stream.index]
stream.deactivate()
stream.end()
removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)]
for stream in removed_streams:
notification_center.remove_observer(self, sender=stream)
self.proposed_streams.remove(stream)
del stream_map[stream.index]
stream.deactivate()
stream.end()
with api.timeout(self.media_stream_timeout):
while wait_count > 0 or not connected or self._channel:
notification = self._channel.wait()
if notification.name == 'MediaStreamDidStart':
wait_count -= 1
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected':
if not connected:
connected = True
notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator='remote', method='INVITE', code=200, reason='OK', ack_received=True))
elif notification.data.prev_state == 'connected':
unhandled_notifications.append(notification)
+ elif notification.data.state == 'disconnected':
+ raise InvitationDidFailError(notification.sender, notification.data)
else:
unhandled_notifications.append(notification)
except (MediaStreamDidFailError, api.TimeoutError), e:
if self._invitation.state == 'connecting':
ack_received = False if isinstance(e, api.TimeoutError) and wait_count == 0 else 'unknown'
# pjsip's invite session object does not inform us whether the ACK was received or not
notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator='remote', method='INVITE', code=200, reason='OK', ack_received=ack_received))
elif self._invitation.state == 'connected' and not connected:
# we didn't yet get to process the SIPInvitationChangedState (state -> connected) notification
notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator='remote', method='INVITE', code=200, reason='OK', ack_received=True))
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
code = 200 if self._invitation.state not in ('incoming', 'early') else 0
reason = sip_status_messages[200] if self._invitation.state not in ('incoming', 'early') else None
reason_header = None
if isinstance(e, api.TimeoutError) and wait_count > 0:
error = 'media stream timed out while starting'
elif isinstance(e, api.TimeoutError) and wait_count == 0:
error = 'No ACK received'
reason_header = ReasonHeader('SIP')
reason_header.cause = 500
reason_header.text = 'Missing ACK'
else:
error = 'media stream failed: %s' % e.data.reason
reason_header = ReasonHeader('SIP')
reason_header.cause = 500
reason_header.text = e.data.reason
self.start_time = datetime.now()
self._fail(originator='local', code=code, reason=reason, error=error, reason_header=reason_header)
except InvitationDidFailError, e:
notification_center.remove_observer(self, sender=self._invitation)
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self.state = 'terminated'
if e.data.prev_state in ('incoming', 'early'):
notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator='remote', method='INVITE', code=487, reason='Session Cancelled', ack_received='unknown'))
notification_center.post_notification('SIPSessionDidFail', self, TimestampedNotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason=e.data.disconnect_reason, redirect_identities=None))
elif e.data.prev_state == 'connecting' and e.data.disconnect_reason == 'missing ACK':
notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator='remote', method='INVITE', code=200, reason='OK', ack_received=False))
notification_center.post_notification('SIPSessionDidFail', self, TimestampedNotificationData(originator='local', code=200, reason=sip_status_messages[200], failure_reason=e.data.disconnect_reason, redirect_identities=None))
else:
notification_center.post_notification('SIPSessionWillEnd', self, TimestampedNotificationData(originator='remote'))
notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator='remote', method=e.data.method, code=200, reason='OK'))
self.end_time = datetime.now()
notification_center.post_notification('SIPSessionDidEnd', self, TimestampedNotificationData(originator='remote', end_reason=e.data.disconnect_reason))
self.greenlet = None
except SIPCoreInvalidStateError:
# the only reason for which this error can be thrown is if invitation.send_response was called after the INVITE session was cancelled by the remote party
notification_center.remove_observer(self, sender=self._invitation)
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self.state = 'terminated'
notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator='remote', method='INVITE', code=487, reason='Session Cancelled', ack_received='unknown'))
notification_center.post_notification('SIPSessionDidFail', self, TimestampedNotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None))
except SIPCoreError, e:
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
code = 200 if self._invitation.state not in ('incoming', 'early') else 0
reason = sip_status_messages[200] if self._invitation.state not in ('incoming', 'early') else None
self._fail(originator='local', code=code, reason=reason, error='SIP core error: %s' % str(e))
else:
self.greenlet = None
self.state = 'connected'
self.streams = self.proposed_streams
self.proposed_streams = None
self.start_time = datetime.now()
notification_center.post_notification('SIPSessionDidStart', self, TimestampedNotificationData(streams=self.streams))
for notification in unhandled_notifications:
self.handle_notification(notification)
if self._hold_in_progress:
self._send_hold()
@transition_state('incoming', 'terminating')
@run_in_green_thread
def reject(self, code=603, reason=None):
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
try:
self._invitation.send_response(code, reason)
with api.timeout(1):
while True:
notification = self._channel.wait()
if notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'disconnected':
ack_received = notification.data.disconnect_reason != 'missing ACK'
notification_center.post_notification('SIPSessionDidProcessTransaction', self,
TimestampedNotificationData(originator='remote', method='INVITE', code=code, reason=sip_status_messages[code], ack_received=ack_received))
break
except SIPCoreInvalidStateError:
# the only reason for which this error can be thrown is if invitation.send_response was called after the INVITE session was cancelled by the remote party
notification_center.remove_observer(self, sender=self._invitation)
self.state = 'terminated'
notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator='remote', method='INVITE', code=487, reason='Session Cancelled', ack_received='unknown'))
notification_center.post_notification('SIPSessionDidFail', self, TimestampedNotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None))
except SIPCoreError, e:
code = 200 if self._invitation.state not in ('incoming', 'early') else 0
reason = sip_status_messages[200] if self._invitation.state not in ('incoming', 'early') else None
self._fail(originator='local', code=code, reason=reason, error='SIP core error: %s' % str(e))
except api.TimeoutError:
notification_center.remove_observer(self, sender=self._invitation)
self.greenlet = None
self.state = 'terminated'
notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator='remote', method='INVITE', code=code, reason=sip_status_messages[code], ack_received=False))
notification_center.post_notification('SIPSessionDidFail', self, TimestampedNotificationData(originator='local', code=code, reason=sip_status_messages[code], failure_reason='user request', redirect_identities=None))
else:
notification_center.remove_observer(self, sender=self._invitation)
self.greenlet = None
self.state = 'terminated'
self.proposed_streams = None
notification_center.post_notification('SIPSessionDidFail', self, TimestampedNotificationData(originator='local', code=code, reason=sip_status_messages[code], failure_reason='user request', redirect_identities=None))
@transition_state('received_proposal', 'accepting_proposal')
@run_in_green_thread
def accept_proposal(self, streams):
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
unhandled_notifications = []
streams = [stream for stream in streams if stream in self.proposed_streams]
for stream in self.proposed_streams:
if stream in streams:
notification_center.add_observer(self, sender=stream)
stream.initialize(self, direction='incoming')
try:
wait_count = len(streams)
while wait_count > 0:
notification = self._channel.wait()
if notification.name == 'MediaStreamDidInitialize':
wait_count -= 1
local_sdp = SDPSession.new(self._invitation.sdp.active_local)
local_sdp.version += 1
stream_map = dict((stream.index, stream) for stream in streams)
for index, media in enumerate(self._invitation.sdp.proposed_remote.media):
stream = stream_map.get(index, None)
if stream is not None:
if index < len(local_sdp.media):
local_sdp.media[index] = stream.get_local_media(for_offer=False)
else:
local_sdp.media.append(stream.get_local_media(for_offer=False))
elif index >= len(local_sdp.media): # actually == is sufficient
media = SDPMediaStream.new(media)
media.port = 0
local_sdp.media.append(media)
self._invitation.send_response(200, sdp=local_sdp)
prev_on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote)
received_invitation_state = False
received_sdp_update = False
while not received_invitation_state or not received_sdp_update:
notification = self._channel.wait()
if notification.name == 'SIPInvitationGotSDPUpdate':
received_sdp_update = True
if notification.data.succeeded:
local_sdp = notification.data.local_sdp
remote_sdp = notification.data.remote_sdp
for stream in self.streams:
stream.update(local_sdp, remote_sdp, stream.index)
else:
self._fail_proposal(originator='remote', error='SDP negotiation failed: %s' % notification.data.error)
return
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected' and notification.data.sub_state == 'normal':
notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator='remote', method='INVITE', code=200, reason=sip_status_messages[200], ack_received='unknown'))
received_invitation_state = True
+ elif notification.data.state == 'disconnected':
+ raise InvitationDidFailError(notification.sender, notification.data)
on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote)
if on_hold_streams != prev_on_hold_streams:
hold_supported_streams = (stream for stream in self.streams if stream.hold_supported)
notification_center.post_notification('SIPSessionDidChangeHoldState', self, TimestampedNotificationData(originator='remote', on_hold=bool(on_hold_streams),
partial=bool(on_hold_streams) and any(not stream.on_hold_by_remote for stream in hold_supported_streams)))
for stream in streams:
stream.start(local_sdp, remote_sdp, stream.index)
with api.timeout(self.media_stream_timeout):
wait_count = len(streams)
while wait_count > 0 or self._channel:
notification = self._channel.wait()
if notification.name == 'MediaStreamDidStart':
wait_count -= 1
else:
unhandled_notifications.append(notification)
except (MediaStreamDidFailError, api.TimeoutError), e:
if isinstance(e, api.TimeoutError):
error = 'media stream timed out while starting'
else:
error = 'media stream failed: %s' % e.data.reason
self._fail_proposal(originator='remote', error=error)
except InvitationDidFailError, e:
self._fail_proposal(originator='remote', error='session ended')
self.handle_notification(Notification('SIPInvitationChangedState', e.invitation, e.data))
except SIPCoreError, e:
self._fail_proposal(originator='remote', error='SIP core error: %s' % str(e))
else:
self.greenlet = None
self.state = 'connected'
notification_center.post_notification('SIPSessionGotAcceptProposal', self, TimestampedNotificationData(originator='remote', streams=streams, proposed_streams=self.proposed_streams))
self.streams = self.streams + streams
proposed_streams = self.proposed_streams
self.proposed_streams = None
notification_center.post_notification('SIPSessionDidRenegotiateStreams', self, TimestampedNotificationData(originator='remote', action='add', streams=streams))
for notification in unhandled_notifications:
self.handle_notification(notification)
if self._hold_in_progress:
self._send_hold()
@transition_state('received_proposal', 'rejecting_proposal')
@run_in_green_thread
def reject_proposal(self, code=488, reason=None):
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
try:
self._invitation.send_response(code, reason)
with api.timeout(1, None):
while True:
notification = self._channel.wait()
if notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected' and notification.data.sub_state == 'normal':
notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator='remote', method='INVITE', code=code, reason=sip_status_messages[code], ack_received='unknown'))
except SIPCoreError, e:
self._fail_proposal(originator='remote', error='SIP core error: %s' % str(e))
else:
self.greenlet = None
self.state = 'connected'
notification_center.post_notification('SIPSessionGotRejectProposal', self, TimestampedNotificationData(originator='remote', code=code, reason=sip_status_messages[code], streams=self.proposed_streams))
self.proposed_streams = None
if self._hold_in_progress:
self._send_hold()
@transition_state('connected', 'sending_proposal')
@run_in_green_thread
def add_stream(self, stream):
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
received_code = None
received_reason = None
unhandled_notifications = []
self.proposed_streams = [stream]
notification_center.add_observer(self, sender=stream)
stream.initialize(self, direction='outgoing')
try:
while True:
notification = self._channel.wait()
if notification.name == 'MediaStreamDidInitialize':
break
elif notification.name == 'SIPInvitationChangedState':
# This is actually the only reason for which this notification could be received
if notification.data.state == 'connected' and notification.data.sub_state == 'received_proposal':
self._fail_proposal(originator='local', error='received stream proposal')
self.handle_notification(notification)
return
local_sdp = SDPSession.new(self._invitation.sdp.active_local)
local_sdp.version += 1
stream.index = len(local_sdp.media)
local_sdp.media.append(stream.get_local_media(for_offer=True))
self._invitation.send_reinvite(sdp=local_sdp)
notification_center.post_notification('SIPSessionGotProposal', self, TimestampedNotificationData(originator='local', streams=self.proposed_streams))
received_invitation_state = False
received_sdp_update = False
try:
with api.timeout(settings.sip.invite_timeout):
while not received_invitation_state or not received_sdp_update:
notification = self._channel.wait()
if notification.name == 'SIPInvitationGotSDPUpdate':
received_sdp_update = True
if notification.data.succeeded:
local_sdp = notification.data.local_sdp
remote_sdp = notification.data.remote_sdp
for s in self.streams:
s.update(local_sdp, remote_sdp, s.index)
else:
self._fail_proposal(originator='local', error='SDP negotiation failed: %s' % notification.data.error)
return
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'early':
if notification.data.code == 180:
notification_center.post_notification('SIPSessionGotRingIndication', self, TimestampedNotificationData())
notification_center.post_notification('SIPSessionGotProvisionalResponse', self, TimestampedNotificationData(code=notification.data.code, reason=notification.data.reason))
elif notification.data.state == 'connected' and notification.data.sub_state == 'normal':
received_invitation_state = True
notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator='local', method='INVITE', code=notification.data.code, reason=notification.data.reason))
if 200 <= notification.data.code < 300:
received_code = notification.data.code
received_reason = notification.data.reason
else:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
notification_center.post_notification('SIPSessionGotRejectProposal', self, TimestampedNotificationData(originator='local', code=notification.data.code, reason=notification.data.reason, streams=self.proposed_streams))
self.state = 'connected'
self.proposed_streams = None
self.greenlet = None
return
+ elif notification.data.state == 'disconnected':
+ raise InvitationDidFailError(notification.sender, notification.data)
except api.TimeoutError:
self.greenlet = None
self.cancel_proposal()
return
try:
remote_media = remote_sdp.media[stream.index]
except IndexError:
self._fail_proposal(originator='local', error='SDP media missing in answer')
return
else:
if remote_media.port:
stream.start(local_sdp, remote_sdp, stream.index)
else:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
notification_center.post_notification('SIPSessionGotRejectProposal', self, TimestampedNotificationData(originator='local', code=received_code, reason=received_reason, streams=self.proposed_streams))
self.state = 'connected'
self.proposed_streams = None
self.greenlet = None
return
with api.timeout(self.media_stream_timeout):
wait_count = 1
while wait_count > 0 or self._channel:
notification = self._channel.wait()
if notification.name == 'MediaStreamDidStart':
wait_count -= 1
except (MediaStreamDidFailError, api.TimeoutError), e:
if isinstance(e, api.TimeoutError):
error = 'media stream timed out while starting'
else:
error = 'media stream failed: %s' % e.data.reason
self._fail_proposal(originator='local', error=error)
except InvitationDidFailError, e:
self._fail_proposal(originator='local', error='session ended')
self.handle_notification(Notification('SIPInvitationChangedState', e.invitation, e.data))
except SIPCoreError, e:
self._fail_proposal(originator='local', error='SIP core error: %s' % str(e))
else:
self.greenlet = None
self.state = 'connected'
notification_center.post_notification('SIPSessionGotAcceptProposal', self, TimestampedNotificationData(originator='local', streams=self.proposed_streams, proposed_streams=self.proposed_streams))
self.streams = self.streams + self.proposed_streams
proposed_streams = self.proposed_streams
self.proposed_streams = None
notification_center.post_notification('SIPSessionDidRenegotiateStreams', self, TimestampedNotificationData(originator='local', action='add', streams=proposed_streams))
for notification in unhandled_notifications:
self.handle_notification(notification)
if self._hold_in_progress:
self._send_hold()
@transition_state('connected', 'sending_proposal')
@run_in_green_thread
def remove_stream(self, stream):
if stream not in self.streams:
self.state = 'connected'
return
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
unhandled_notifications = []
try:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
self.streams.remove(stream)
local_sdp = SDPSession.new(self._invitation.sdp.active_local)
local_sdp.version += 1
local_sdp.media[stream.index].port = 0
self._invitation.send_reinvite(sdp=local_sdp)
received_invitation_state = False
received_sdp_update = False
while not received_invitation_state or not received_sdp_update:
notification = self._channel.wait()
if notification.name == 'SIPInvitationGotSDPUpdate':
received_sdp_update = True
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'early':
if notification.data.code == 180:
notification_center.post_notification('SIPSessionGotRingIndication', self, TimestampedNotificationData())
notification_center.post_notification('SIPSessionGotProvisionalResponse', self, TimestampedNotificationData(code=notification.data.code, reason=notification.data.reason))
elif notification.data.state == 'connected' and notification.data.sub_state == 'normal':
received_invitation_state = True
notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator='local', method='INVITE', code=notification.data.code, reason=notification.data.reason))
+ elif notification.data.state == 'disconnected':
+ raise InvitationDidFailError(notification.sender, notification.data)
except InvitationDidFailError, e:
self.greenlet = None
self.handle_notification(Notification('SIPInvitationChangedState', e.invitation, e.data))
except SIPCoreError:
raise #FIXME
else:
stream.end()
self.greenlet = None
self.state = 'connected'
notification_center.post_notification('SIPSessionDidRenegotiateStreams', self, TimestampedNotificationData(originator='local', action='remove', streams=[stream]))
for notification in unhandled_notifications:
self.handle_notification(notification)
if self._hold_in_progress:
self._send_hold()
@transition_state('sending_proposal', 'cancelling_proposal')
@run_in_green_thread
def cancel_proposal(self):
if self.greenlet is not None:
api.kill(self.greenlet, api.GreenletExit())
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
try:
self._invitation.cancel_reinvite()
while True:
try:
notification = self._channel.wait()
except MediaStreamDidFailError:
continue
if notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected' and notification.data.sub_state == 'normal':
notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator='remote', method='INVITE', code=notification.data.code, reason=notification.data.reason))
if notification.data.code == 487:
if self.proposed_streams:
for stream in self.proposed_streams:
stream.deactivate()
stream.end()
notification_center.post_notification('SIPSessionGotRejectProposal', self, TimestampedNotificationData(originator='remote', code=notification.data.code, reason=notification.data.reason, streams=self.proposed_streams))
elif notification.data.code == 200:
self.end()
+ elif notification.data.state == 'disconnected':
+ raise InvitationDidFailError(notification.sender, notification.data)
break
except SIPCoreError, e:
self.proposed_streams = None
self.greenlet = None
self.state = 'connected'
notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator='local', code=0, reason=None, failure_reason='SIP core error: %s' % str(e), redirect_identities=None))
except InvitationDidFailError, e:
self.proposed_streams = None
self.greenlet = None
self.handle_notification(Notification('SIPInvitationChangedState', e.invitation, e.data))
else:
self.proposed_streams = None
self.greenlet = None
self.state = 'connected'
finally:
if self._hold_in_progress:
self._send_hold()
@run_in_green_thread
def hold(self):
if self.on_hold or self._hold_in_progress:
return
self._hold_in_progress = True
streams = self.streams if self.streams is not None else self.proposed_streams
if not streams:
return
for stream in streams:
stream.hold()
if self.state == 'connected':
self._send_hold()
@run_in_green_thread
def unhold(self):
if not self.on_hold and not self._hold_in_progress:
return
self._hold_in_progress = False
streams = self.streams if self.streams is not None else self.proposed_streams
if not streams:
return
for stream in streams:
stream.unhold()
if self.state == 'connected':
self._send_unhold()
@run_in_green_thread
def end(self):
if self.greenlet is not None:
api.kill(self.greenlet, api.GreenletExit())
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
if self._invitation is None or self._invitation.state in (None, 'disconnecting', 'disconnected'):
return
self.state = 'terminating'
if self._invitation.state == 'connected':
notification_center.post_notification('SIPSessionWillEnd', self, TimestampedNotificationData(originator='local'))
streams = (self.streams or []) + (self.proposed_streams or [])
for stream in streams[:]:
try:
notification_center.remove_observer(self, sender=stream)
except KeyError:
streams.remove(stream)
else:
stream.deactivate()
cancelling = self._invitation.state != 'connected'
try:
self._invitation.end(timeout=1)
while True:
try:
notification = self._channel.wait()
except MediaStreamDidFailError:
continue
if notification.name == 'SIPInvitationChangedState' and notification.data.state == 'disconnected':
if cancelling:
notification_center.post_notification('SIPSessionDidProcessTransaction', self,
TimestampedNotificationData(originator='local', method='INVITE', code=notification.data.code, reason=notification.data.reason))
elif hasattr(notification.data, 'method'):
notification_center.post_notification('SIPSessionDidProcessTransaction', self,
TimestampedNotificationData(originator='remote', method=notification.data.method, code=200, reason=sip_status_messages[200]))
elif notification.data.disconnect_reason == 'user request':
notification_center.post_notification('SIPSessionDidProcessTransaction', self,
TimestampedNotificationData(originator='local', method='BYE', code=notification.data.code, reason=notification.data.reason))
break
except SIPCoreError, e:
if cancelling:
notification_center.post_notification('SIPSessionDidFail', self, TimestampedNotificationData(originator='local', code=0, reason=None, failure_reason='SIP core error: %s' % str(e), redirect_identities=None))
else:
self.end_time = datetime.now()
notification_center.post_notification('SIPSessionDidEnd', self, TimestampedNotificationData(originator='local', end_reason='SIP core error: %s' % str(e)))
return
finally:
for stream in streams:
stream.end()
notification_center.remove_observer(self, sender=self._invitation)
self.greenlet = None
self.state = 'terminated'
if cancelling:
notification_center.post_notification('SIPSessionDidFail', self, TimestampedNotificationData(originator='local', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None))
else:
self.end_time = datetime.now()
notification_center.post_notification('SIPSessionDidEnd', self, TimestampedNotificationData(originator='local', end_reason='user request'))
@property
def local_identity(self):
if self._invitation is not None and self._invitation.local_identity is not None:
return self._invitation.local_identity
else:
return self._local_identity
@property
def remote_identity(self):
if self._invitation is not None and self._invitation.remote_identity is not None:
return self._invitation.remote_identity
else:
return self._remote_identity
@property
def remote_user_agent(self):
return self._invitation.remote_user_agent if self._invitation is not None else None
def _send_hold(self):
self.state = 'sending_proposal'
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
unhandled_notifications = []
try:
local_sdp = SDPSession.new(self._invitation.sdp.active_local)
local_sdp.version += 1
for stream in self.streams:
local_sdp.media[stream.index] = stream.get_local_media(for_offer=True)
self._invitation.send_reinvite(sdp=local_sdp)
received_invitation_state = False
received_sdp_update = False
while not received_invitation_state or not received_sdp_update:
notification = self._channel.wait()
if notification.name == 'SIPInvitationGotSDPUpdate':
received_sdp_update = True
if notification.data.succeeded:
local_sdp = notification.data.local_sdp
remote_sdp = notification.data.remote_sdp
for stream in self.streams:
stream.update(local_sdp, remote_sdp, stream.index)
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'early':
if notification.data.code == 180:
notification_center.post_notification('SIPSessionGotRingIndication', self, TimestampedNotificationData())
notification_center.post_notification('SIPSessionGotProvisionalResponse', self, TimestampedNotificationData(code=notification.data.code, reason=notification.data.reason))
elif notification.data.state == 'connected' and notification.data.sub_state == 'normal':
received_invitation_state = True
notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator='local', method='INVITE', code=notification.data.code, reason=notification.data.reason))
+ elif notification.data.state == 'disconnected':
+ raise InvitationDidFailError(notification.sender, notification.data)
except InvitationDidFailError, e:
self.greenlet = None
self.handle_notification(Notification('SIPInvitationChangedState', e.invitation, e.data))
except SIPCoreError, e:
raise #FIXME
else:
self.greenlet = None
self.on_hold = True
self.state = 'connected'
hold_supported_streams = (stream for stream in self.streams if stream.hold_supported)
notification_center.post_notification('SIPSessionDidChangeHoldState', self, TimestampedNotificationData(originator='local', on_hold=True, partial=any(not stream.on_hold_by_local for stream in hold_supported_streams)))
for notification in unhandled_notifications:
self.handle_notification(notification)
if not self._hold_in_progress:
for stream in self.streams:
stream.unhold()
self._send_unhold()
else:
self._hold_in_progress = False
def _send_unhold(self):
self.state = 'sending_proposal'
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
unhandled_notifications = []
try:
local_sdp = SDPSession.new(self._invitation.sdp.active_local)
local_sdp.version += 1
for stream in self.streams:
local_sdp.media[stream.index] = stream.get_local_media(for_offer=True)
self._invitation.send_reinvite(sdp=local_sdp)
received_invitation_state = False
received_sdp_update = False
while not received_invitation_state or not received_sdp_update:
notification = self._channel.wait()
if notification.name == 'SIPInvitationGotSDPUpdate':
received_sdp_update = True
if notification.data.succeeded:
local_sdp = notification.data.local_sdp
remote_sdp = notification.data.remote_sdp
for stream in self.streams:
stream.update(local_sdp, remote_sdp, stream.index)
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'early':
if notification.data.code == 180:
notification_center.post_notification('SIPSessionGotRingIndication', self, TimestampedNotificationData())
notification_center.post_notification('SIPSessionGotProvisionalResponse', self, TimestampedNotificationData(code=notification.data.code, reason=notification.data.reason))
elif notification.data.state == 'connected' and notification.data.sub_state == 'normal':
received_invitation_state = True
notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator='local', method='INVITE', code=notification.data.code, reason=notification.data.reason))
+ elif notification.data.state == 'disconnected':
+ raise InvitationDidFailError(notification.sender, notification.data)
except InvitationDidFailError, e:
self.greenlet = None
self.handle_notification(Notification('SIPInvitationChangedState', e.invitation, e.data))
except SIPCoreError, e:
raise #FIXME
else:
self.greenlet = None
self.on_hold = False
self.state = 'connected'
notification_center.post_notification('SIPSessionDidChangeHoldState', self, TimestampedNotificationData(originator='local', on_hold=False, partial=False))
for notification in unhandled_notifications:
self.handle_notification(notification)
if self._hold_in_progress:
for stream in self.streams:
stream.hold()
self._send_hold()
def _fail(self, originator, code, reason, error, reason_header=None):
notification_center = NotificationCenter()
prev_inv_state = self._invitation.state
self.state = 'terminating'
if prev_inv_state not in (None, 'incoming', 'outgoing', 'early', 'connecting'):
notification_center.post_notification('SIPSessionWillEnd', self, TimestampedNotificationData(originator=originator))
if self._invitation.state not in (None, 'disconnecting', 'disconnected'):
try:
if self._invitation.direction == 'incoming' and self._invitation.state in ('incoming', 'early'):
self._invitation.send_response(500, extra_headers=[reason_header] if reason_header is not None else [])
else:
self._invitation.end(extra_headers=[reason_header] if reason_header is not None else [])
with api.timeout(1):
while True:
notification = self._channel.wait()
if notification.name == 'SIPInvitationChangedState' and notification.data.state == 'disconnected':
if prev_inv_state in ('connecting', 'connected'):
if notification.data.disconnect_reason in ('timeout', 'missing ACK'):
code = 200
reason = 'OK'
originator = 'local'
elif hasattr(notification.data, 'method'):
code = 200
reason = 'OK'
originator = 'remote'
else:
code = notification.data.code
reason = notification.data.reason
originator = 'local'
notification_center.post_notification('SIPSessionDidProcessTransaction', self,
TimestampedNotificationData(originator=originator, method='BYE', code=code, reason=reason))
elif self._invitation.direction == 'incoming' and prev_inv_state in ('incoming', 'early'):
ack_received = notification.data.disconnect_reason != 'missing ACK'
notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator='remote', method='INVITE', code=500, reason=sip_status_messages[500], ack_received=ack_received))
elif self._invitation.direction == 'outgoing' and prev_inv_state in ('outgoing', 'early'):
notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator='local', method='INVITE', code=487, reason='Session Cancelled'))
break
except SIPCoreError:
pass
except api.TimeoutError:
if prev_inv_state in ('connecting', 'connected'):
notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator='local', method='BYE', code=408, reason=sip_status_messages[408]))
notification_center.remove_observer(self, sender=self._invitation)
self.state = 'terminated'
if prev_inv_state in (None, 'incoming', 'outgoing', 'early', 'connecting'):
if self._invitation.direction == 'incoming':
code = code or 500
reason = reason or sip_status_messages[500]
notification_center.post_notification('SIPSessionDidFail', self, TimestampedNotificationData(originator=originator, code=code, reason=reason, failure_reason=error, redirect_identities=None))
else:
self.end_time = datetime.now()
notification_center.post_notification('SIPSessionDidEnd', self, TimestampedNotificationData(originator=originator, end_reason=error))
self.greenlet = None
def _fail_proposal(self, originator, error):
notification_center = NotificationCenter()
for stream in self.proposed_streams:
try:
notification_center.remove_observer(self, sender=stream)
except KeyError:
# _fail_proposal can be called from reject_proposal, which means the stream will
# not have been initialized or the session registered as an observer for it.
pass
else:
stream.deactivate()
stream.end()
if originator == 'remote' and self._invitation.sub_state == 'received_proposal':
self._invitation.send_response(500)
notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator='remote', method='INVITE', code=500, reason=sip_status_messages[500], ack_received='unknown'))
notification_center.post_notification('SIPSessionHadProposalFailure', self, TimestampedNotificationData(originator=originator, failure_reason=error, streams=self.proposed_streams))
self.state = 'connected'
self.proposed_streams = None
self.greenlet = None
@run_in_green_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, None)
if handler is not None:
handler(notification)
def _NH_SIPInvitationChangedState(self, notification):
notification_center = NotificationCenter()
if self.greenlet is not None:
if notification.data.state == 'disconnected' and notification.data.prev_state != 'disconnecting':
self._channel.send_exception(InvitationDidFailError(notification.sender, notification.data))
else:
self._channel.send(notification)
else:
self.greenlet = api.getcurrent()
try:
if notification.data.state == 'connected' and notification.data.sub_state == 'received_proposal':
self.state = 'received_proposal'
try:
proposed_remote_sdp = self._invitation.sdp.proposed_remote
active_remote_sdp = self._invitation.sdp.active_remote
for stream in self.streams:
if not stream.validate_update(proposed_remote_sdp, stream.index):
engine = Engine()
self._invitation.send_response(488, extra_headers=[WarningHeader(399, engine.user_agent, 'Failed to update media stream index %d' % stream.index)])
self.state = 'connected'
notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator='remote', method='INVITE', code=488, reason=sip_status_messages[488], ack_received='unknown'))
return
# These tests are here because some ALGs mess up the SDP and the behaviour
# of pjsip in these situations is unexpected (eg. loss of audio). -Luci
for attr in ('user', 'net_type', 'address_type', 'address'):
if getattr(proposed_remote_sdp, attr) != getattr(active_remote_sdp, attr):
engine = Engine()
self._invitation.send_response(488, extra_headers=[WarningHeader(399, engine.user_agent, 'Difference in contents of o= line')])
self.state = 'connected'
notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator='remote', method='INVITE', code=488, reason=sip_status_messages[488], ack_received='unknown'))
return
added_media_indexes = set()
removed_media_indexes = set()
for index, media_stream in enumerate(proposed_remote_sdp.media):
if index >= len(active_remote_sdp.media):
added_media_indexes.add(index)
elif media_stream.media != active_remote_sdp.media[index].media:
added_media_indexes.add(index)
removed_media_indexes.add(index)
elif not media_stream.port and active_remote_sdp.media[index].port:
removed_media_indexes.add(index)
removed_media_indexes.update(xrange(len(proposed_remote_sdp.media), len(active_remote_sdp.media)))
if added_media_indexes and removed_media_indexes:
engine = Engine()
self._invitation.send_response(488, extra_headers=[WarningHeader(399, engine.user_agent, 'Both removing AND adding a media stream is currently not supported')])
notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator='remote', method='INVITE', code=488, reason=sip_status_messages[488], ack_received='unknown'))
elif added_media_indexes:
self.proposed_streams = []
for index in added_media_indexes:
media_stream = proposed_remote_sdp.media[index]
if media_stream.port != 0:
for stream_type in MediaStreamRegistry():
try:
stream = stream_type.new_from_sdp(self.account, proposed_remote_sdp, index)
except InvalidStreamError:
break
except UnknownStreamError:
continue
else:
stream.index = index
self.proposed_streams.append(stream)
break
if self.proposed_streams:
self._invitation.send_response(100)
notification_center.post_notification('SIPSessionGotProposal', sender=self, data=TimestampedNotificationData(originator='remote', streams=self.proposed_streams))
return
else:
self._invitation.send_response(488)
notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator='remote', method='INVITE', code=488, reason=sip_status_messages[488], ack_received='unknown'))
else:
local_sdp = SDPSession.new(self._invitation.sdp.active_local)
local_sdp.version += 1
removed_streams = [stream for stream in self.streams if stream.index in removed_media_indexes]
prev_on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote)
for stream in removed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
local_sdp.media[stream.index].port = 0
for stream in self.streams:
local_sdp.media[stream.index] = stream.get_local_media(for_offer=False)
try:
self._invitation.send_response(200, sdp=local_sdp)
except PJSIPError, e:
if 'PJMEDIA_SDPNEG' in str(e):
engine = Engine()
self._invitation.send_response(488, extra_headers=[WarningHeader(399, engine.user_agent, 'Changing the codec of an audio stream is currently not supported')])
self.state = 'connected'
notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator='remote', method='INVITE', code=488, reason=sip_status_messages[488], ack_received='unknown'))
return
else:
raise
else:
for stream in removed_streams:
self.streams.remove(stream)
stream.end()
notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator='remote', method='INVITE', code=200, reason=sip_status_messages[200], ack_received='unknown'))
received_invitation_state = False
received_sdp_update = False
while not received_sdp_update or not received_invitation_state:
notification = self._channel.wait()
if notification.name == 'SIPInvitationGotSDPUpdate':
received_sdp_update = True
if notification.data.succeeded:
local_sdp = notification.data.local_sdp
remote_sdp = notification.data.remote_sdp
for stream in self.streams:
stream.update(local_sdp, remote_sdp, stream.index)
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected' and notification.data.sub_state == 'normal':
received_invitation_state = True
on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote)
if on_hold_streams != prev_on_hold_streams:
hold_supported_streams = (stream for stream in self.streams if stream.hold_supported)
notification_center.post_notification('SIPSessionDidChangeHoldState', self, TimestampedNotificationData(originator='remote', on_hold=bool(on_hold_streams),
partial=bool(on_hold_streams) and any(not stream.on_hold_by_remote for stream in hold_supported_streams)))
if removed_media_indexes:
notification_center.post_notification('SIPSessionDidRenegotiateStreams', self, TimestampedNotificationData(originator='remote', action='remove', streams=removed_streams))
except InvitationDidFailError, e:
self.greenlet = None
self.state == 'connected'
self.handle_notification(Notification('SIPInvitationChangedState', e.invitation, e.data))
except SIPCoreError:
raise #FIXME
else:
self.state = 'connected'
elif notification.data.state == 'connected' and notification.data.sub_state == 'normal' and notification.data.prev_sub_state == 'received_proposal':
if notification.data.originator == 'local' and notification.data.code == 487:
self.state = 'connected'
notification_center.post_notification('SIPSessionGotRejectProposal', self, TimestampedNotificationData(originator='remote', code=notification.data.code, reason=notification.data.reason, streams=self.proposed_streams))
self.proposed_streams = None
if self._hold_in_progress:
self._send_hold()
elif notification.data.state == 'disconnected':
if self.state == 'incoming':
self.state = 'terminated'
if notification.data.originator == 'remote':
notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator='remote', method='INVITE', code=487, reason='Session Cancelled', ack_received='unknown'))
notification_center.post_notification('SIPSessionDidFail', self, TimestampedNotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason=notification.data.disconnect_reason, redirect_identities=None))
else:
# There must have been an error involved
notification_center.post_notification('SIPSessionDidFail', self, TimestampedNotificationData(originator='local', code=0, reason=None, failure_reason=notification.data.disconnect_reason, redirect_identities=None))
else:
notification_center.post_notification('SIPSessionWillEnd', self, TimestampedNotificationData(originator=notification.data.originator))
for stream in self.streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self.state = 'terminated'
if notification.data.originator == 'remote':
if hasattr(notification.data, 'method'):
notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator=notification.data.originator, method=notification.data.method, code=200, reason=sip_status_messages[200]))
else:
notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator=notification.data.originator, method='INVITE', code=notification.data.code, reason=notification.data.reason))
self.end_time = datetime.now()
notification_center.post_notification('SIPSessionDidEnd', self, TimestampedNotificationData(originator=notification.data.originator, end_reason=notification.data.disconnect_reason))
notification_center.remove_observer(self, sender=self._invitation)
finally:
self.greenlet = None
def _NH_SIPInvitationGotSDPUpdate(self, notification):
if self.greenlet is not None:
self._channel.send(notification)
def _NH_MediaStreamDidInitialize(self, notification):
if self.greenlet is not None:
self._channel.send(notification)
def _NH_MediaStreamDidStart(self, notification):
if self.greenlet is not None:
self._channel.send(notification)
def _NH_MediaStreamDidFail(self, notification):
if self.greenlet is not None:
if self.state not in ('terminating', 'terminated'):
self._channel.send_exception(MediaStreamDidFailError(notification.sender, notification.data))
else:
stream = notification.sender
if self.streams == [stream]:
self.greenlet = None
self.end()
else:
try:
self.remove_stream(stream)
except IllegalStateError:
notification_center = NotificationCenter()
notification_center.remove_observer(self, sender=stream)
self.streams.remove(stream)
notification_center.post_notification('SIPSessionDidRenegotiateStreams', self, TimestampedNotificationData(originator='remote', action='remove', streams=[stream]))
class SessionManager(object):
__metaclass__ = Singleton
implements(IObserver)
def __init__(self):
self.sessions = []
self.state = None
self._channel = coros.queue()
def start(self):
self.state = 'starting'
notification_center = NotificationCenter()
notification_center.post_notification('SIPSessionManagerWillStart', self, TimestampedNotificationData())
notification_center.add_observer(self, 'SIPInvitationChangedState')
notification_center.add_observer(self, 'SIPSessionNewIncoming')
notification_center.add_observer(self, 'SIPSessionNewOutgoing')
notification_center.add_observer(self, 'SIPSessionDidFail')
notification_center.add_observer(self, 'SIPSessionDidEnd')
self.state = 'started'
notification_center.post_notification('SIPSessionManagerDidStart', self, TimestampedNotificationData())
def stop(self):
self.state = 'stopping'
notification_center = NotificationCenter()
notification_center.post_notification('SIPSessionManagerWillEnd', self, TimestampedNotificationData())
for session in self.sessions:
session.end()
while self.sessions:
self._channel.wait()
notification_center.remove_observer(self, 'SIPInvitationChangedState')
notification_center.remove_observer(self, 'SIPSessionNewIncoming')
notification_center.remove_observer(self, 'SIPSessionNewOutgoing')
notification_center.remove_observer(self, 'SIPSessionDidFail')
notification_center.remove_observer(self, 'SIPSessionDidEnd')
self.state = 'stopped'
notification_center.post_notification('SIPSessionManagerDidEnd', self, TimestampedNotificationData())
@run_in_twisted_thread
def handle_notification(self, notification):
if notification.name == 'SIPInvitationChangedState' and notification.data.state == 'incoming':
account_manager = AccountManager()
account = account_manager.find_account(notification.data.request_uri)
if account is None:
notification.sender.send_response(404)
return
notification.sender.send_response(100)
session = Session(account)
session.init_incoming(notification.sender)
elif notification.name in ('SIPSessionNewIncoming', 'SIPSessionNewOutgoing'):
self.sessions.append(notification.sender)
elif notification.name in ('SIPSessionDidFail', 'SIPSessionDidEnd'):
self.sessions.remove(notification.sender)
if self.state == 'stopping':
self._channel.send(notification)

File Metadata

Mime Type
text/x-diff
Expires
Sat, Feb 1, 7:01 PM (1 d, 18 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3488213
Default Alt Text
(132 KB)

Event Timeline