diff --git a/sylk/applications/webrtcgateway/handler.py b/sylk/applications/webrtcgateway/handler.py index 2b6e193..b2111e5 100644 --- a/sylk/applications/webrtcgateway/handler.py +++ b/sylk/applications/webrtcgateway/handler.py @@ -1,2293 +1,2297 @@ import base64 import hashlib import json import random import os import time import uuid from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null, limit from application.python.weakref import defaultweakobjectmap from application.system import makedirs, unlink from collections import deque from eventlib import coros, proc from itertools import count from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SIPURI, FromHeader, ToHeader, Credentials, Message, RouteHeader, Route, Header from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.payloads.imdn import IMDNDocument, DeliveryNotification, DisplayNotification from sipsimple.streams import MediaStreamRegistry from sipsimple.streams.msrp.chat import CPIMPayload, CPIMParserError, ChatIdentity, CPIMHeader, CPIMNamespace from sipsimple.threading import run_in_thread, run_in_twisted_thread from sipsimple.threading.green import call_in_green_thread, run_in_green_thread from sipsimple.util import ISOTimestamp from shutil import copyfileobj, rmtree from twisted.internet import reactor, defer from typing import Generic, Container, Iterable, Sized, TypeVar, Dict, Set, Optional, Union from werkzeug.exceptions import InternalServerError from zope.interface import implementer from sylk.accounts import DefaultAccount from sylk.configuration import SIPConfig from sylk.session import Session from . import push from .configuration import GeneralConfig, get_room_config, ExternalAuthConfig, JanusConfig from .janus import JanusBackend, JanusError, JanusSession, SIPPluginHandle, VideoroomPluginHandle from .logger import ConnectionLogger, VideoroomLogger from .models import sylkrtc, janus from .storage import TokenStorage, MessageStorage from .auth import AuthHandler class AccountInfo(object): # noinspection PyShadowingBuiltins def __init__(self, id, password, display_name=None, user_agent=None, incoming_header_prefixes=None): self.id = id self.password = password self.display_name = display_name self.user_agent = user_agent self.registration_state = None self.janus_handle = None # type: Optional[SIPPluginHandle] self.contact_params = {} self.incoming_header_prefixes = incoming_header_prefixes.__data__ if incoming_header_prefixes is not None else [] self.auth_handle = None self.auth_state = None @property def uri(self): return 'sip:' + self.id @property def user_data(self): return dict(username=self.uri, display_name=self.display_name, user_agent=self.user_agent, ha1_secret=self.password, contact_params=self.contact_params, incoming_header_prefixes=self.incoming_header_prefixes) class SessionPartyIdentity(object): def __init__(self, uri, display_name=None): self.uri = uri self.display_name = display_name # todo: might need to replace this auto-resetting descriptor with a timer in case we need to know when the slow link state expired class SlowLinkState(object): def __init__(self): self.slow_link = False self.last_reported = 0 class SlowLinkDescriptor(object): __timeout__ = 30 # 30 seconds def __init__(self): self.values = defaultweakobjectmap(SlowLinkState) def __get__(self, instance, owner): if instance is None: return self state = self.values[instance] if state.slow_link and time.time() - state.last_reported > self.__timeout__: state.slow_link = False return state.slow_link def __set__(self, instance, value): state = self.values[instance] if value: state.last_reported = time.time() state.slow_link = bool(value) def __delete__(self, instance): raise AttributeError('Attribute cannot be deleted') class SIPSessionInfo(object): slow_download = SlowLinkDescriptor() slow_upload = SlowLinkDescriptor() # noinspection PyShadowingBuiltins def __init__(self, id): self.id = id self.direction = None self.state = None self.account = None # type: Optional[AccountInfo] self.local_identity = None # type: Optional[SessionPartyIdentity] self.remote_identity = None # type: Optional[SessionPartyIdentity] self.janus_handle = None # type: Optional[SIPPluginHandle] self.slow_download = False self.slow_upload = False self._message_queue = deque() def init_outgoing(self, account, destination): self.account = account self.direction = 'outgoing' self.state = 'connecting' self.local_identity = SessionPartyIdentity(account.id) self.remote_identity = SessionPartyIdentity(destination) def init_incoming(self, account, originator, originator_display_name=''): self.account = account self.direction = 'incoming' self.state = 'connecting' self.local_identity = SessionPartyIdentity(account.id) self.remote_identity = SessionPartyIdentity(originator, originator_display_name) class VideoroomSessionInfo(object): slow_download = SlowLinkDescriptor() slow_upload = SlowLinkDescriptor() # noinspection PyShadowingBuiltins def __init__(self, id, owner, janus_handle): self.type = None # publisher / subscriber self.id = id self.owner = owner # type: ConnectionHandler self.janus_handle = janus_handle # type: VideoroomPluginHandle self.chat_handler = None # type: Optional[VideoroomChatHandler] self.account = None # type: Optional[AccountInfo] self.room = None # type: Optional[Videoroom] self.bitrate = None self.parent_session = None # type: Optional[VideoroomSessionInfo] # for subscribers this is their main session (the one used to join), for publishers is None self.publisher_id = None # janus publisher ID for publishers / publisher session ID for subscribers self.slow_download = False self.slow_upload = False self.feeds = PublisherFeedContainer() # keeps references to all the other participant's publisher feeds that we subscribed to def init_publisher(self, account, room): self.type = 'publisher' self.account = account self.room = room self.bitrate = room.config.max_bitrate self.chat_handler = VideoroomChatHandler(session=self) def init_subscriber(self, publisher_session, parent_session): assert publisher_session.type == parent_session.type == 'publisher' self.type = 'subscriber' self.publisher_id = publisher_session.id self.parent_session = parent_session self.account = parent_session.account self.room = parent_session.room self.bitrate = self.room.config.max_bitrate def __repr__(self): return '<{0.__class__.__name__}: type={0.type!r} id={0.id!r} janus_handle={0.janus_handle!r}>'.format(self) class PublisherFeedContainer(object): """A container for the other participant's publisher sessions that we have subscribed to""" def __init__(self): self._publishers = set() self._id_map = {} # map publisher.id -> publisher and publisher.publisher_id -> publisher def add(self, session): assert session not in self._publishers assert session.id not in self._id_map and session.publisher_id not in self._id_map self._publishers.add(session) self._id_map[session.id] = self._id_map[session.publisher_id] = session def discard(self, item): # item can be any of session, session.id or session.publisher_id session = self._id_map[item] if item in self._id_map else item if item in self._publishers else None if session is not None: self._publishers.discard(session) self._id_map.pop(session.id, None) self._id_map.pop(session.publisher_id, None) def remove(self, item): # item can be any of session, session.id or session.publisher_id session = self._id_map[item] if item in self._id_map else item self._publishers.remove(session) self._id_map.pop(session.id) self._id_map.pop(session.publisher_id) def pop(self, item): # item can be any of session, session.id or session.publisher_id session = self._id_map[item] if item in self._id_map else item self._publishers.remove(session) self._id_map.pop(session.id) self._id_map.pop(session.publisher_id) return session def clear(self): self._publishers.clear() self._id_map.clear() def __len__(self): return len(self._publishers) def __iter__(self): return iter(self._publishers) def __getitem__(self, key): return self._id_map[key] def __contains__(self, item): return item in self._id_map or item in self._publishers class Videoroom(object): def __init__(self, uri, audio, video): self.id = random.getrandbits(32) # janus needs numeric room names self.uri = uri self.audio = audio self.video = video self.config = get_room_config(uri) self.log = VideoroomLogger(self) self._active_participants = [] self._sessions = set() # type: Set[VideoroomSessionInfo] self._id_map = {} # type: Dict[Union[str, int], VideoroomSessionInfo] # map session.id -> session and session.publisher_id -> session self._shared_files = [] self._raised_hands = [] if self.config.record: makedirs(self.config.recording_dir, 0o755) self.log.info('created (recording on)') else: self.log.info('created') if self.config.video_disabled: self.video = False if self.config.persistent: self.read_files_from_disk() @property def active_participants(self): return self._active_participants @active_participants.setter def active_participants(self, participant_list): unknown_participants = set(participant_list).difference(self._id_map) if unknown_participants: raise ValueError('unknown participant session id: {}'.format(', '.join(unknown_participants))) if self._active_participants != participant_list: self._active_participants = participant_list self.log.info('active participants: {}'.format(', '.join(self._active_participants) or None)) self._update_bitrate() @property def raised_hands(self): return self._raised_hands @raised_hands.setter def raised_hands(self, session_id): if session_id in self._raised_hands: self.log.info('{session} lowers hand '.format(session=session_id)) self._raised_hands.remove(session_id) else: self.log.info('{session} raises hand '.format(session=session_id)) self._raised_hands.append(session_id) def add(self, session): assert session not in self._sessions assert session.publisher_id is not None assert session.publisher_id not in self._id_map and session.id not in self._id_map self._sessions.add(session) self._id_map[session.id] = self._id_map[session.publisher_id] = session self.log.info('{session.account.id} has joined'.format(session=session)) self._update_bitrate() if self._active_participants: session.owner.send(sylkrtc.VideoroomConfigureEvent(session=session.id, active_participants=self._active_participants, originator='videoroom')) if self._shared_files: session.owner.send(sylkrtc.VideoroomFileSharingEvent(session=session.id, files=self._shared_files)) if self._raised_hands: session.owner.send(sylkrtc.VideoroomRaisedHandsEvent(session=session.id, raised_hands=self._raised_hands)) if self.config.invite_participants and len(self._sessions) == 1: originator = sylkrtc.SIPIdentity(uri=session.account.id, display_name=session.account.display_name) for participant in self.config.invite_participants: if session.account.id != participant: push.conference_invite(originator=originator, destination=participant, room=self.uri, call_id=session.id, audio=self.audio, video=self.video) # noinspection DuplicatedCode def discard(self, session): if session in self._sessions: self._sessions.discard(session) self._id_map.pop(session.id, None) self._id_map.pop(session.publisher_id, None) self.log.info('{session.account.id} has left'.format(session=session)) if session.id in self._active_participants: self._active_participants.remove(session.id) self.log.info('active participants: {}'.format(', '.join(self._active_participants) or None)) for session in self._sessions: session.owner.send(sylkrtc.VideoroomConfigureEvent(session=session.id, active_participants=self._active_participants, originator='videoroom')) self._update_bitrate() # noinspection DuplicatedCode def remove(self, session): self._sessions.remove(session) self._id_map.pop(session.id) self._id_map.pop(session.publisher_id) self.log.info('{session.account.id} has left'.format(session=session)) if session.id in self._active_participants: self._active_participants.remove(session.id) self.log.info('active participants: {}'.format(', '.join(self._active_participants) or None)) for session in self._sessions: session.owner.send(sylkrtc.VideoroomConfigureEvent(session=session.id, active_participants=self._active_participants, originator='videoroom')) self._update_bitrate() def clear(self): for session in self._sessions: self.log.info('{session.account.id} has left'.format(session=session)) self._active_participants = [] self._shared_files = [] self._sessions.clear() self._id_map.clear() def allow_uri(self, uri): config = self.config if config.access_policy == 'allow,deny': return config.allow.match(uri) and not config.deny.match(uri) else: return not config.deny.match(uri) or config.allow.match(uri) def add_file(self, upload_request): self._write_file(upload_request) def get_file(self, filename): path = os.path.join(self.config.filesharing_dir, filename) if os.path.exists(path): return path else: raise LookupError('file does not exist') @staticmethod def _fix_path(path): name, extension = os.path.splitext(path) for x in count(0, step=-1): path = '{}{}{}'.format(name, x or '', extension) if not os.path.exists(path) and not os.path.islink(path): return path @run_in_thread('file-io') def _write_file(self, upload_request): makedirs(self.config.filesharing_dir) path = self._fix_path(os.path.join(self.config.filesharing_dir, upload_request.shared_file.filename)) upload_request.shared_file.filename = os.path.basename(path) meta_path = os.path.join(self.config.filesharing_dir, f'meta-{upload_request.shared_file.filename}') try: with open(path, 'wb') as output_file: copyfileobj(upload_request.content, output_file) with open(meta_path, 'w+') as output_file: output_file.write(json.dumps(upload_request.shared_file.__data__)) except (OSError, IOError): upload_request.had_error = True unlink(path) self._write_file_done(upload_request) @run_in_twisted_thread def _write_file_done(self, upload_request): if upload_request.had_error: upload_request.deferred.errback(InternalServerError('could not save file')) else: self._shared_files.append(upload_request.shared_file) for session in self._sessions: session.owner.send(sylkrtc.VideoroomFileSharingEvent(session=session.id, files=[upload_request.shared_file])) upload_request.deferred.callback('OK') @run_in_thread('file-io') def read_files_from_disk(self): with os.scandir(self.config.filesharing_dir) as file_list: for entry in file_list: if not entry.name.startswith('.') and entry.is_file() and not entry.name.startswith('meta-'): try: with open(os.path.join(self.config.filesharing_dir, f'meta-{entry.name}'), 'r') as f: content = f.read() except (OSError, IOError): continue try: test = json.loads(content) except (json.JSONDecodeError): continue shared_file = sylkrtc.SharedFile(**test) self._shared_files.append(shared_file) def cleanup(self): if not self.config.persistent: self._remove_files() @run_in_thread('file-io') def _remove_files(self): rmtree(self.config.filesharing_dir, ignore_errors=True) def _update_bitrate(self): if self._sessions: if self._active_participants: # todo: should we use max_bitrate / 2 or max_bitrate for each active participant if there are 2 active participants? active_participant_bitrate = self.config.max_bitrate // len(self._active_participants) other_participant_bitrate = 100000 self.log.debug('participant bitrate is {} (active) / {} (others)'.format(active_participant_bitrate, other_participant_bitrate)) for session in self._sessions: if session.id in self._active_participants: bitrate = active_participant_bitrate else: bitrate = other_participant_bitrate if session.bitrate != bitrate: session.bitrate = bitrate session.janus_handle.message(janus.VideoroomUpdatePublisher(bitrate=bitrate), _async=True) else: bitrate = self.config.max_bitrate // limit(len(self._sessions) - 1, min=1) self.log.debug('participant bitrate is {}'.format(bitrate)) for session in self._sessions: if session.bitrate != bitrate: session.bitrate = bitrate session.janus_handle.message(janus.VideoroomUpdatePublisher(bitrate=bitrate), _async=True) # todo: make Videoroom be a context manager that is retained/released on enter/exit and implement __nonzero__ to be different from __len__ # todo: so that a videoroom is not accidentally released by the last participant leaving while a new participant waits to join # todo: this needs a new model for communication with janus and the client that is pseudo-synchronous (uses green threads) def __len__(self): return len(self._sessions) def __iter__(self): return iter(self._sessions) def __getitem__(self, key): return self._id_map[key] def __contains__(self, item): return item in self._id_map or item in self._sessions SessionT = TypeVar('SessionT', SIPSessionInfo, VideoroomSessionInfo) class SessionContainer(Sized, Iterable[SessionT], Container[SessionT], Generic[SessionT]): def __init__(self): self._sessions = set() self._id_map = {} # map session.id -> session and session.janus_handle.id -> session def add(self, session): assert session not in self._sessions assert session.id not in self._id_map and session.janus_handle.id not in self._id_map self._sessions.add(session) self._id_map[session.id] = self._id_map[session.janus_handle.id] = session def discard(self, item): # item can be any of session, session.id or session.janus_handle.id session = self._id_map[item] if item in self._id_map else item if item in self._sessions else None if session is not None: self._sessions.discard(session) self._id_map.pop(session.id, None) self._id_map.pop(session.janus_handle.id, None) def remove(self, item): # item can be any of session, session.id or session.janus_handle.id session = self._id_map[item] if item in self._id_map else item self._sessions.remove(session) self._id_map.pop(session.id) self._id_map.pop(session.janus_handle.id) def pop(self, item): # item can be any of session, session.id or session.janus_handle.id session = self._id_map[item] if item in self._id_map else item self._sessions.remove(session) self._id_map.pop(session.id) self._id_map.pop(session.janus_handle.id) return session def clear(self): self._sessions.clear() self._id_map.clear() def __len__(self): return len(self._sessions) def __iter__(self): return iter(self._sessions) def __getitem__(self, key): return self._id_map[key] def __contains__(self, item): return item in self._id_map or item in self._sessions class OperationName(str): __normalizer__ = str.maketrans('-', '_') @property def normalized(self): return self.translate(self.__normalizer__) class Operation(object): __slots__ = 'type', 'name', 'data' __types__ = 'request', 'event' # noinspection PyShadowingBuiltins def __init__(self, type, name, data): if type not in self.__types__: raise ValueError("Can't instantiate class {.__class__.__name__} with unknown type: {!r}".format(self, type)) self.type = type self.name = OperationName(name) self.data = data class APIError(Exception): pass class GreenEvent(object): def __init__(self): self._event = coros.event() def set(self): if self._event.ready(): return self._event.send(True) def is_set(self): return self._event.ready() def clear(self): if self._event.ready(): self._event.reset() def wait(self): return self._event.wait() # noinspection PyPep8Naming @implementer(IObserver) class ConnectionHandler(object): janus = JanusBackend() def __init__(self, protocol): self.protocol = protocol self.device_id = base64.b64encode(hashlib.md5(protocol.peer.encode('utf-8')).digest()).rstrip(b'=\n').decode('utf-8') self.janus_session = None # type: Optional[JanusSession] self.accounts_map = {} # account ID -> account self.devices_map = {} # device ID -> account self.connections_map = {} # peer connection -> account self.account_handles_map = {} # Janus handle ID -> account self.sip_sessions = SessionContainer() # type: SessionContainer[SIPSessionInfo] # incoming and outgoing SIP sessions self.videoroom_sessions = SessionContainer() # type: SessionContainer[VideoroomSessionInfo] # publisher and subscriber sessions in video rooms self.ready_event = GreenEvent() self.resolver = DNSLookup() self.proc = proc.spawn(self._operations_handler) self.operations_queue = coros.queue() self.log = ConnectionLogger(self) self.state = None self._stop_pending = False self.decline_code = JanusConfig.decline_code or 486 @run_in_green_thread def start(self): self.state = 'starting' try: self.janus_session = JanusSession() except Exception as e: self.state = 'failed' self.log.warning('could not create session, disconnecting: %s' % e) if self._stop_pending: # if stop was already called it means we were already disconnected self.stop() else: self.protocol.disconnect(3000, str(e)) else: self.state = 'started' self.ready_event.set() if self._stop_pending: self.stop() else: self.send(sylkrtc.ReadyEvent()) def stop(self): if self.state in (None, 'starting'): self._stop_pending = True return self.state = 'stopping' self._stop_pending = False if self.proc is not None: # Kill the operation's handler proc first, in order to not have any operations active while we cleanup. self.proc.kill() # Also proc.kill() will switch to another green thread, which is another reason to do it first so that self.proc = None # we do not switch to another green thread in the middle of the cleanup with a partially deleted handler if self.ready_event.is_set(): # Do not explicitly detach the janus plugin handles before destroying the janus session. Janus runs each request in a different # thread, so making detach and destroy request without waiting for the detach to finish can result in errors from race conditions. # Because we do not want to wait for them, we will rely instead on the fact that janus automatically detaches the plugin handles # when it destroys a session, so we only remove our event handlers and issue a destroy request for the session. for account_info in list(self.accounts_map.values()): if account_info.janus_handle is not None: self.janus.set_event_handler(account_info.janus_handle.id, None) notification_center = NotificationCenter() notification_center.remove_observer(self, sender=account_info.id) for session in self.sip_sessions: if session.janus_handle is not None: self.janus.set_event_handler(session.janus_handle.id, None) for session in self.videoroom_sessions: if session.janus_handle is not None: self.janus.set_event_handler(session.janus_handle.id, None) if session.chat_handler is not None: notification_center = NotificationCenter() notification_center.remove_observer(self, sender=session.chat_handler) session.chat_handler.end() session.chat_handler = None if session in session.room: # We need to check if the room can be destroyed, else this will never happen reactor.callLater(2, call_in_green_thread, self._maybe_destroy_videoroom_after_disconnect, session.room) session.room.discard(session) session.feeds.clear() self.janus_session.destroy() # this automatically detaches all plugin handles associated with it, no need to manually do it # cleanup self.ready_event.clear() self.accounts_map.clear() self.devices_map.clear() self.connections_map.clear() self.account_handles_map.clear() self.sip_sessions.clear() self.videoroom_sessions.clear() self.janus_session = None self.protocol = None self.state = 'stopped' def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def handle_message(self, message): try: request = sylkrtc.SylkRTCRequest.from_message(message) except sylkrtc.ProtocolError as e: self.log.error(str(e)) except Exception as e: self.log.error('{request_type}: {exception!s}'.format(request_type=message['sylkrtc'], exception=e)) if 'transaction' in message: self.send(sylkrtc.ErrorResponse(transaction=message['transaction'], error=str(e))) else: operation = Operation(type='request', name=request.sylkrtc, data=request) self.operations_queue.send(operation) def send(self, message): if self.protocol is not None: self.protocol.sendMessage(json.dumps(message.__data__)) # internal methods (not overriding / implementing the protocol API) def _cleanup_session(self, session): # should only be called from a green thread. if self.janus_session is None: # The connection was closed, there is noting to do return if session in self.sip_sessions: self.sip_sessions.remove(session) if session.direction == 'outgoing': # Destroy plugin handle for outgoing sessions. For incoming ones it's the same as the account handle, so don't session.janus_handle.detach() def _cleanup_videoroom_session(self, session): # should only be called from a green thread. if self.janus_session is None: # The connection was closed, there is noting to do return if session in self.videoroom_sessions: self.videoroom_sessions.remove(session) if session.type == 'publisher': notification_center = NotificationCenter() notification_center.remove_observer(self, sender=session.chat_handler) session.room.discard(session) session.feeds.clear() session.janus_handle.detach() session.chat_handler.end() self._maybe_destroy_videoroom(session.room) else: session.parent_session.feeds.discard(session.publisher_id) session.janus_handle.detach() def _maybe_destroy_videoroom(self, videoroom): # should only be called from a green thread. if self.protocol is None or self.janus_session is None: # The connection was closed, there is nothing to do return if videoroom in self.protocol.factory.videorooms and not videoroom: self.protocol.factory.videorooms.remove(videoroom) videoroom.cleanup() with VideoroomPluginHandle(self.janus_session, event_handler=self._handle_janus_videoroom_event) as videoroom_handle: videoroom_handle.destroy(room=videoroom.id) videoroom.log.info('destroyed') def _maybe_destroy_videoroom_after_disconnect(self, videoroom): # should only be called from a green thread. if self.protocol is None and not videoroom: videoroom.cleanup() videoroom.log.info('destroyed') def _lookup_sip_proxy(self, uri): # The proxy dance: Sofia-SIP seems to do a DNS lookup per SIP message when a domain is passed # as the proxy, so do the resolution ourselves and give it pre-resolver proxy URL. Since we use # caching to avoid long delays, we randomize the results matching the highest priority route's # transport. proxy = GeneralConfig.outbound_sip_proxy if proxy is not None: sip_uri = SIPURI(host=proxy.host, port=proxy.port, parameters={'transport': proxy.transport}) else: sip_uri = SIPURI.parse('sip:%s' % uri) settings = SIPSimpleSettings() try: routes = self.resolver.lookup_sip_proxy(sip_uri, settings.sip.transport_list).wait() except DNSLookupError as e: raise DNSLookupError('DNS lookup error: {exception!s}'.format(exception=e)) if not routes: raise DNSLookupError('DNS lookup error: no results found') route = random.choice([r for r in routes if r.transport == routes[0].transport]) self.log.debug('DNS lookup for SIP proxy for {} yielded {}'.format(uri, route)) # Build a proxy URI Sofia-SIP likes return 'sips:{route.address}:{route.port}'.format(route=route) if route.transport == 'tls' else str(route.uri) def _callid_to_uuid(self, callid): hexa = hashlib.md5(callid.encode()).hexdigest() uuidv4 = '%s-%s-%s-%s-%s' % (hexa[:8], hexa[8:12], hexa[12:16], hexa[16:20], hexa[20:]) return uuidv4 def _lookup_sip_target_route(self, uri): if GeneralConfig.local_sip_messages: return Route(address=SIPConfig.local_ip, port=SIPConfig.local_tcp_port, transport='tcp') proxy = GeneralConfig.outbound_sip_proxy if proxy is not None: sip_uri = SIPURI(host=proxy.host, port=proxy.port, parameters={'transport': proxy.transport}) else: sip_uri = SIPURI.parse('sip:%s' % uri) settings = SIPSimpleSettings() try: routes = self.resolver.lookup_sip_proxy(sip_uri, settings.sip.transport_list).wait() except DNSLookupError as e: raise DNSLookupError('DNS lookup error: {exception!s}'.format(exception=e)) if not routes: raise DNSLookupError('DNS lookup error: no results found') route = random.choice([r for r in routes if r.transport == routes[0].transport]) self.log.debug('DNS lookup for SIP message proxy for {} yielded {}'.format(uri, route)) return route def _send_sip_message(self, account, uri, message_id, content, content_type='text/plain', timestamp=None, add_disposition=True): route = self._lookup_sip_target_route(uri) sip_uri = SIPURI.parse('sip:%s' % uri) if route: identity = str(account.uri) if account.display_name: identity = '"%s" <%s>' % (account.display_name, identity) self.log.debug("sending message from '%s' to '%s' using proxy %s" % (identity, uri, route)) from_uri = SIPURI.parse(account.uri) content = content if isinstance(content, bytes) else content.encode() ns = CPIMNamespace('urn:ietf:params:imdn', 'imdn') additional_headers = [CPIMHeader('Message-ID', ns, message_id)] additional_sip_headers = [] if add_disposition: additional_headers.append(CPIMHeader('Disposition-Notification', ns, 'positive-delivery, display')) if GeneralConfig.local_sip_messages: additional_sip_headers.append(Header('X-Sylk-App', 'webrtcgateway')) payload = CPIMPayload(content, content_type, charset='utf-8', sender=ChatIdentity(from_uri, account.display_name), recipients=[ChatIdentity(sip_uri, None)], timestamp=timestamp if timestamp is not None else str(ISOTimestamp.now()), additional_headers=additional_headers) payload, content_type = payload.encode() credentials = Credentials(username=from_uri.user, password=account.password.encode('utf-8'), digest=True) message_request = Message(FromHeader(from_uri, account.display_name), ToHeader(sip_uri), RouteHeader(route.uri), content_type, payload, credentials=credentials, extra_headers=additional_sip_headers) notification_center = NotificationCenter() notification_center.add_observer(self, sender=message_request) #self._message_queue.append((message_id, content, content_type)) message_request.send() def _send_simple_sip_message(self, account, uri, content, content_type='text/plain'): route = self._lookup_sip_target_route(uri) sip_uri = SIPURI.parse('sip:%s' % uri) if route: identity = str(account) self.log.info("sending simple message from '%s' to '%s' using proxy %s" % (identity, uri, route)) from_uri = SIPURI.parse(f'sip:{identity}') content = content if isinstance(content, bytes) else content.encode() message_request = Message(FromHeader(from_uri), ToHeader(sip_uri), RouteHeader(route.uri), content_type, content, extra_headers=[Header('X-Sylk-To-Sip', 'yes')]) message_request.send() def _fork_event_to_online_accounts(self, account_info, event): for protocol in self.protocol.factory.connections.difference([self.protocol]): connection_handler = protocol.connection_handler try: connection_handler.accounts_map[account_info.id] except KeyError: pass else: connection_handler.send(event) def _send_in_dialog_sip_message(self, session, message_id, content, content_type='text/plain', timestamp=None): identity = str(session.account.uri) if session.account.display_name: identity = '"%s" <%s>' % (session.account.display_name, identity) self.log.info("sending in dialag message from '%s' to '%s' " % (identity, session.remote_identity.uri)) from_uri = SIPURI.parse(session.account.uri) sip_uri = SIPURI.parse('sip:%s' % session.remote_identity.uri) content = content if isinstance(content, bytes) else content.encode() ns = CPIMNamespace('urn:ietf:params:imdn', 'imdn') additional_headers = [CPIMHeader('Message-ID', ns, message_id)] payload = CPIMPayload(content, content_type, charset='utf-8', sender=ChatIdentity(from_uri, session.account.display_name), recipients=[ChatIdentity(sip_uri, None)], timestamp=timestamp if timestamp is not None else str(ISOTimestamp.now()), additional_headers=additional_headers) payload, content_type = payload.encode() session.janus_handle.sendMessage(content_type=content_type, content=payload) session._message_queue.append((message_id, payload, content_type)) def _handle_janus_sip_event(self, event): operation = Operation(type='event', name='janus-sip', data=event) self.operations_queue.send(operation) def _handle_janus_videoroom_event(self, event): operation = Operation(type='event', name='janus-videoroom', data=event) self.operations_queue.send(operation) def _operations_handler(self): self.ready_event.wait() while True: operation = self.operations_queue.wait() handler = getattr(self, '_OH_' + operation.type) handler(operation) del operation, handler def _OH_request(self, operation): handler = getattr(self, '_RH_' + operation.name.normalized) request = operation.data try: handler(request) except (APIError, DNSLookupError, JanusError) as e: self.log.error('{operation.name}: {exception!s}'.format(operation=operation, exception=e)) self.send(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) except Exception as e: self.log.exception('{operation.type} {operation.name}: {exception!s}'.format(operation=operation, exception=e)) self.send(sylkrtc.ErrorResponse(transaction=request.transaction, error='Internal error')) else: self.send(sylkrtc.AckResponse(transaction=request.transaction)) def _OH_event(self, operation): handler = getattr(self, '_EH_' + operation.name.normalized) try: handler(operation.data) except Exception as e: self.log.exception('{operation.type} {operation.name}: {exception!s}'.format(operation=operation, exception=e)) # Request handlers def _RH_ping(self, request): pass def _RH_lookup_public_key(self, request): storage = MessageStorage() public_key = storage.get_public_key(account=request.uri) if isinstance(public_key, defer.Deferred): public_key.addCallback(lambda result: self.send(sylkrtc.LookupPublicKeyEvent(uri=request.uri, public_key=result))) else: self.send(sylkrtc.LookupPublicKeyEvent(uri=request.uri, public_key=public_key)) def _RH_account_add(self, request): if request.account in self.accounts_map: raise APIError('Account {request.account} already added'.format(request=request)) # check if domain is acceptable domain = request.account.partition('@')[2] if not {'*', domain}.intersection(GeneralConfig.sip_domains): raise APIError('SIP domain not allowed: %s' % domain) # Create and store our mapping account_info = AccountInfo(request.account, request.password, request.display_name, request.user_agent, request.incoming_header_prefixes) # get the auth config for domain account_info.auth_handle = AuthHandler(account_info, self) self.accounts_map[account_info.id] = account_info self.devices_map[self.device_id] = account_info.id self.connections_map[self.protocol.peer] = account_info.id notification_center = NotificationCenter() notification_center.add_observer(self, sender=account_info.id) self.log.debug(f'Incoming header prefixes: {request.incoming_header_prefixes}') self.log.info('added using {request.user_agent}'.format(request=request)) def _RH_account_remove(self, request): try: account_info = self.accounts_map.pop(request.account) except KeyError: raise APIError('Unknown account specified for remove: {request.account}'.format(request=request)) # cleanup in case the client didn't unregister before removing the account if account_info.janus_handle is not None: account_info.janus_handle.detach() self.account_handles_map.pop(account_info.janus_handle.id) notification_center = NotificationCenter() notification_center.remove_observer(self, sender=account_info.id) self.log.info('removed') try: del(self.devices_map[request.account]) except KeyError: pass try: del(self.connections_map[request.account]) except KeyError: pass def _RH_account_register(self, request): try: account_info = self.accounts_map[request.account] except KeyError: raise APIError('Unknown account specified for register: {request.account}'.format(request=request)) proxy = self._lookup_sip_proxy(request.account) if account_info.janus_handle is not None: # Destroy the existing plugin handle account_info.janus_handle.detach() self.account_handles_map.pop(account_info.janus_handle.id) account_info.janus_handle = None # Create a plugin handle account_info.janus_handle = SIPPluginHandle(self.janus_session, event_handler=self._handle_janus_sip_event) self.account_handles_map[account_info.janus_handle.id] = account_info if ExternalAuthConfig.enable: account_info.auth_handle.authenticate(proxy) else: account_info.janus_handle.register(account_info, proxy=proxy) self.log.info('registering to SIP Proxy {proxy}...'.format(proxy=proxy)) def _RH_account_unregister(self, request): try: account_info = self.accounts_map[request.account] except KeyError: raise APIError('Unknown account specified for unregister: {request.account}'.format(request=request)) if account_info.janus_handle is not None: account_info.janus_handle.detach() self.account_handles_map.pop(account_info.janus_handle.id) account_info.janus_handle = None if 'pn_app' in account_info.contact_params: storage = TokenStorage() storage.remove(request.account, account_info.contact_params['pn_app'], account_info.contact_params['pn_device']) self.log.info('unregistered') def _RH_account_devicetoken(self, request): if request.account not in self.accounts_map: raise APIError('Unknown account specified for token: {request.account}'.format(request=request)) if request.token is not None: account_info = self.accounts_map[request.account] account_info.contact_params = { 'pn_app': request.app, 'pn_tok': request.token, 'pn_type': request.platform, 'pn_device': request.device, 'pn_silent': str(int(request.silent is True)) # janus expects a string } storage = TokenStorage() storage.add(request.account, account_info.contact_params, account_info.user_agent) self.log.info('added token on {request.platform} device {request.device})'.format(request=request)) def _RH_account_message(self, request): try: account_info = self.accounts_map[request.account] except KeyError: raise APIError('Unknown account specified: {request.account}'.format(request=request)) uri = request.uri content_type = request.content_type content = request.content if content_type.startswith('text') else request.content.encode('latin1') message_id = request.message_id timestamp = request.timestamp storage = MessageStorage() storage.add(account=account_info.id, contact=uri, direction="outgoing", content = content if isinstance(content, str) else content.decode('latin1'), content_type=content_type, timestamp=timestamp, disposition_notification=['positive-delivery', 'display'], message_id=message_id, state='pending') self.log.info('sending message ({content_type}) to: {uri}'.format(content_type=content_type, uri=uri)) self._send_sip_message(account_info, uri, message_id, content, content_type, timestamp=timestamp) event = sylkrtc.AccountSyncEvent(account=account_info.id, type='message', action='add', content=request) self._fork_event_to_online_accounts(account_info, event) def _RH_account_disposition_notification(self, request): try: account_info = self.accounts_map[request.account] except KeyError: raise APIError('Unknown account specified: {request.account}'.format(request=request)) uri = request.uri message_id = request.message_id state = request.state if state == 'delivered': notification = DeliveryNotification(state) elif state == 'displayed': notification = DisplayNotification(state) elif state == 'error': notification = DisplayNotification(state) content = IMDNDocument.create(message_id=message_id, datetime=request.timestamp, recipient_uri=uri, notification=notification) storage = MessageStorage() storage.update(account=account_info.id, state=state, message_id=message_id) self.log.info('sending IMDN message ({status}) to: {uri}'.format(status=state, uri=uri)) self._send_sip_message(account_info, uri, str(uuid.uuid4()), content, IMDNDocument.content_type, add_disposition=False) def _RH_account_sync_conversations(self, request): try: account_info = self.accounts_map[request.account] except KeyError: raise APIError('Unknown account specified: {request.account}'.format(request=request)) storage = MessageStorage() - messages = storage[[account_info.id, request.message_id]] + try: + since = request.since + except AttributeError: + since = None + messages = storage[[account_info.id, request.message_id, since]] if isinstance(messages, defer.Deferred): messages.addCallback(lambda result: self.send(sylkrtc.AccountSyncConversationsEvent(account=account_info.id, messages=result))) def _RH_account_mark_conversation_read(self, request): try: account_info = self.accounts_map[request.account] except KeyError: raise APIError('Unknown account specified: {request.account}'.format(request=request)) contact = request.contact event = sylkrtc.AccountMarkConversationReadEventData(contact=request.contact) storage = MessageStorage() storage.mark_conversation_read(account_info.id, contact) storage.add(account=account_info.id, contact=request.contact, direction='', content=request.contact, content_type='application/sylk-conversation-read', timestamp=str(ISOTimestamp.now()), disposition_notification='', message_id=str(uuid.uuid4())) event = sylkrtc.AccountSyncEvent(account=account_info.id, type='conversation', action='read', content=event) self._fork_event_to_online_accounts(account_info, event) def _RH_account_remove_message(self, request): try: account_info = self.accounts_map[request.account] except KeyError: raise APIError('Unknown account specified: {request.account}'.format(request=request)) contact = request.contact message_id = request.message_id storage = MessageStorage() storage.removeMessage(account=account_info.id, message_id=message_id) content = sylkrtc.AccountMessageRemoveEventData(contact=contact, message_id=message_id) storage.add(account=account_info.id, contact=contact, direction='', content=json.dumps(content.__data__), content_type='application/sylk-message-remove', timestamp=str(ISOTimestamp.now()), disposition_notification='', message_id=str(uuid.uuid4())) event = sylkrtc.AccountSyncEvent(account=account_info.id, type='message', action='remove', content=content) self._fork_event_to_online_accounts(account_info, event) self._send_simple_sip_message(contact, account_info.id, json.dumps(content.__data__), 'application/sylk-message-remove') # Delete from receiver def receiver_remove_message(msg_id, messages): for message in messages: if message.message_id == msg_id and message.direction == 'incoming': account = message.account message_id = message.message_id storage = MessageStorage() storage.removeMessage(account=account, message_id=message_id) content = sylkrtc.AccountMessageRemoveEventData(contact=message.contact, message_id=message_id) storage.add(account=account, contact=message.contact, direction='', content=json.dumps(content.__data__), content_type='application/sylk-message-remove', timestamp=str(ISOTimestamp.now()), disposition_notification='', message_id=str(uuid.uuid4())) event = sylkrtc.AccountSyncEvent(account=account, type='message', action='remove', content=content) account_object = type('account_object', (object,), {'id': account}) self._fork_event_to_online_accounts(account_object, event) self.log.info("Removed receiver message") break messages = storage[[contact, '']] if isinstance(messages, defer.Deferred): messages.addCallback(lambda result: receiver_remove_message(msg_id=request.message_id, messages=result)) def _RH_account_remove_conversation(self, request): try: account_info = self.accounts_map[request.account] except KeyError: raise APIError('Unknown account specified: {request.account}'.format(request=request)) contact = request.contact storage = MessageStorage() storage.removeChat(account=account_info.id, contact=contact) storage.add(account=account_info.id, contact=contact, direction='', content=contact, content_type='application/sylk-conversation-remove', timestamp=str(ISOTimestamp.now()), disposition_notification='', message_id=str(uuid.uuid4())) content = sylkrtc.AccountConversationRemoveEventData(contact=contact) event = sylkrtc.AccountSyncEvent(account=account_info.id, type='conversation', action='remove', content=content) self._fork_event_to_online_accounts(account_info, event) def _RH_session_create(self, request): if request.session in self.sip_sessions: raise APIError('Session ID {request.session} already in use'.format(request=request)) try: account_info = self.accounts_map[request.account] except KeyError: raise APIError('Unknown account specified: {request.account}'.format(request=request)) proxy = self._lookup_sip_proxy(request.uri) # Create a new plugin handle and 'register' it, without actually doing so janus_handle = SIPPluginHandle(self.janus_session, event_handler=self._handle_janus_sip_event) headers = {'headers': request.headers.__data__} if request.headers is not None else {} try: janus_handle.call(account_info, uri=request.uri, sdp=request.sdp, proxy=proxy, **headers) except Exception: janus_handle.detach() raise session_info = SIPSessionInfo(request.session) session_info.janus_handle = janus_handle session_info.init_outgoing(account_info, request.uri) self.sip_sessions.add(session_info) self.log.info('outgoing session {request.session} to {request.uri}'.format(request=request)) def _RH_session_answer(self, request): try: session_info = self.sip_sessions[request.session] except KeyError: raise APIError('Unknown session {request.session}'.format(request=request)) if session_info.direction != 'incoming': raise APIError('Cannot answer outgoing session {request.session}'.format(request=request)) if session_info.state != 'connecting': raise APIError('Invalid state for answering session {session.id}: {session.state}'.format(session=session_info)) headers = {'headers': request.headers.__data__} if request.headers is not None else {} session_info.janus_handle.accept(sdp=request.sdp, **headers) self.log.info('incoming session {session.id} answered'.format(session=session_info)) def _RH_session_trickle(self, request): try: session_info = self.sip_sessions[request.session] except KeyError: raise APIError('Unknown session {request.session}'.format(request=request)) if session_info.state == 'terminated': raise APIError('Session {request.session} is terminated'.format(request=request)) session_info.janus_handle.trickle(request.candidates) if not request.candidates: self.log.debug('session {session.id} negotiated ICE'.format(session=session_info)) def _RH_session_terminate(self, request): try: session_info = self.sip_sessions[request.session] except KeyError: raise APIError('Unknown session {request.session}'.format(request=request)) if session_info.state not in ('connecting', 'progress', 'early_media', 'accepted', 'established'): raise APIError('Invalid state for terminating session {session.id}: {session.state}'.format(session=session_info)) if session_info.direction == 'incoming' and session_info.state == 'connecting': session_info.janus_handle.decline(self.decline_code) else: session_info.janus_handle.hangup() self.log.info('{session.direction} session {session.id} will terminate'.format(session=session_info)) def _RH_session_message(self, request): self.log.info("Sending janus in dailaog message") try: session_info = self.sip_sessions[request.session] except KeyError: raise APIError('Unknown session {request.session}'.format(request=request)) if session_info.state not in ('established'): raise APIError('Invalid state session {session.id}: {session.state}i for sending messages'.format(session=session_info)) self._send_in_dialog_sip_message(session_info, message_id=request.message_id, content=request.content, content_type=request.content_type, timestamp=request.timestamp) def _RH_videoroom_join(self, request): if request.session in self.videoroom_sessions: raise APIError('Session ID {request.session} already in use'.format(request=request)) try: account_info = self.accounts_map[request.account] except KeyError: raise APIError('Unknown account specified: {request.account}'.format(request=request)) try: videoroom = self.protocol.factory.videorooms[request.uri] except KeyError: videoroom = Videoroom(request.uri, request.audio, request.video) self.protocol.factory.videorooms.add(videoroom) if not videoroom.allow_uri(request.account): self._maybe_destroy_videoroom(videoroom) raise APIError('is not allowed to join room {request.uri}'.format(request=request)) if ('m=video' in request.sdp and 'm=audio' in request.sdp): media = 'audio/video' elif ('m=video' in request.sdp): media = 'video only' elif ('m=audio' in request.sdp): media = 'audio only' else: media = 'unknown' try: videoroom_handle = VideoroomPluginHandle(self.janus_session, event_handler=self._handle_janus_videoroom_event) try: try: videoroom_handle.create(room=videoroom.id, config=videoroom.config, publishers=10) except JanusError as e: if e.code != 427: # 427 means room already exists raise else: self.log.info('created room {room}'.format(room=request.uri)) videoroom_handle.join(room=videoroom.id, sdp=request.sdp, display_name=account_info.display_name, audio=videoroom.audio, video=videoroom.video) except Exception: videoroom_handle.detach() raise except Exception: self._maybe_destroy_videoroom(videoroom) raise videoroom_session = VideoroomSessionInfo(request.session, owner=self, janus_handle=videoroom_handle) videoroom_session.init_publisher(account=account_info, room=videoroom) self.log.info('publish {media} to room {room}'.format(room=request.uri, media=media)) self.videoroom_sessions.add(videoroom_session) notification_center = NotificationCenter() notification_center.add_observer(self, sender=videoroom_session.chat_handler) videoroom_session.chat_handler.start() self.send(sylkrtc.VideoroomSessionProgressEvent(session=videoroom_session.id)) def _RH_videoroom_leave(self, request): try: videoroom_session = self.videoroom_sessions[request.session] except KeyError: raise APIError('Unknown room session: {request.session}'.format(request=request)) videoroom_session.janus_handle.leave() self.send(sylkrtc.VideoroomSessionTerminatedEvent(session=videoroom_session.id)) # safety net in case we do not get any answer for the leave request # todo: to be adjusted later after pseudo-synchronous communication with janus is implemented reactor.callLater(2, call_in_green_thread, self._cleanup_videoroom_session, videoroom_session) self.log.debug('leaving room {session.room.uri}'.format(session=videoroom_session)) def _RH_videoroom_configure(self, request): try: videoroom_session = self.videoroom_sessions[request.session] except KeyError: raise APIError('Unknown room session: {request.session}'.format(request=request)) videoroom = videoroom_session.room # todo: should we send out events if the active participant list did not change? try: videoroom.active_participants = request.active_participants except ValueError as e: raise APIError(str(e)) for session in videoroom: session.owner.send(sylkrtc.VideoroomConfigureEvent(session=session.id, active_participants=videoroom.active_participants, originator=request.session)) def _RH_videoroom_feed_attach(self, request): # sent when a feed is subscribed for a given publisher if request.feed in self.videoroom_sessions: raise APIError('Video room session ID {request.feed} already in use'.format(request=request)) try: base_session = self.videoroom_sessions[request.session] # our 'base' session (the one used to join and publish) except KeyError: raise APIError('Unknown room session: {request.session}'.format(request=request)) try: publisher_session = base_session.room[request.publisher] # the publisher's session (the one we want to subscribe to) except KeyError: raise APIError('Unknown publisher room session to attach to: {request.publisher}'.format(request=request)) if publisher_session.publisher_id is None: raise APIError('Video room session {session.id} does not have a publisher ID'.format(session=publisher_session)) videoroom_handle = VideoroomPluginHandle(self.janus_session, event_handler=self._handle_janus_videoroom_event) try: videoroom_handle.feed_attach(room=base_session.room.id, feed=publisher_session.publisher_id, offer_audio=base_session.room.audio, offer_video=base_session.room.video) except Exception: videoroom_handle.detach() raise videoroom_session = VideoroomSessionInfo(request.feed, owner=self, janus_handle=videoroom_handle) videoroom_session.init_subscriber(publisher_session, parent_session=base_session) self.videoroom_sessions.add(videoroom_session) base_session.feeds.add(publisher_session) self.log.debug('subscribe to {account} in room {session.room.uri} {feeds}'.format(account=publisher_session.account.id, session=videoroom_session, feeds=len(base_session.feeds))) def _RH_videoroom_feed_answer(self, request): try: videoroom_session = self.videoroom_sessions[request.feed] except KeyError: raise APIError('Unknown room session: {request.feed}'.format(request=request)) if videoroom_session.parent_session.id != request.session: raise APIError('{request.feed} is not an attached feed of {request.session}'.format(request=request)) if ('m=video' in request.sdp and 'm=audio' in request.sdp): media = 'audio/video' elif ('m=video' in request.sdp): media = 'video only' elif ('m=audio' in request.sdp): media = 'audio only' else: media = 'unknown' self.log.debug('{media} media accepted by room {session.room.uri}'.format(media=media, session=videoroom_session)) videoroom_session.janus_handle.feed_start(sdp=request.sdp) def _RH_videoroom_feed_detach(self, request): try: videoroom_session = self.videoroom_sessions[request.feed] except KeyError: raise APIError('Unknown room session to detach: {request.feed}'.format(request=request)) if videoroom_session.parent_session.id != request.session: raise APIError('{request.feed} is not an attached feed of {request.session}'.format(request=request)) videoroom_session.janus_handle.feed_detach() # safety net in case we do not get any answer for the feed_detach request # todo: to be adjusted later after pseudo-synchronous communication with janus is implemented self.log.debug('unsubscribe from {account} in room {session.room.uri}'.format(account=videoroom_session.room[videoroom_session.publisher_id].account.id, session=videoroom_session)) reactor.callLater(2, call_in_green_thread, self._cleanup_videoroom_session, videoroom_session) def _RH_videoroom_invite(self, request): try: base_session = self.videoroom_sessions[request.session] except KeyError: raise APIError('Unknown room session: {request.session}'.format(request=request)) room = base_session.room participants = set(request.participants) originator = sylkrtc.SIPIdentity(uri=base_session.account.id, display_name=base_session.account.display_name) session_id = str(random.getrandbits(32)) event = sylkrtc.AccountConferenceInviteEvent(account='placeholder', room=room.uri, originator=originator, session_id=self._callid_to_uuid(session_id)) for protocol in self.protocol.factory.connections.difference([self.protocol]): connection_handler = protocol.connection_handler for account in participants.intersection(connection_handler.accounts_map): event.account = account connection_handler.send(event) room.log.info('invitation from %s for %s', originator.uri, account) room.log.debug('invitation from %s for %s with session-id %s', originator.uri, account, session_id) connection_handler.log.info('received an invitation from %s for %s to join room %s', originator.uri, account, room.uri) for participant in participants.difference([base_session.account.id]): if not any(session.account.id == participant for session in base_session.room): push.conference_invite(originator=originator, destination=participant, room=room.uri, call_id=session_id, audio=room.audio, video=room.video) def _RH_videoroom_session_trickle(self, request): try: videoroom_session = self.videoroom_sessions[request.session] except KeyError: raise APIError('Unknown room session: {request.session}'.format(request=request)) videoroom_session.janus_handle.trickle(request.candidates) if not request.candidates and videoroom_session.type == 'publisher': self.log.debug('ICE negotiation to room {session.room.uri} completed'.format(session=videoroom_session)) def _RH_videoroom_session_update(self, request): try: videoroom_session = self.videoroom_sessions[request.session] except KeyError: raise APIError('Unknown room session: {request.session}'.format(request=request)) options = request.options.__data__ if options: videoroom_session.janus_handle.update_publisher(options) modified = ', '.join('{}={}'.format(key, options[key]) for key in options) media = 'video' try: has_video = options['video'] except KeyError: pass else: if not has_video: media = 'audio only' self.log.info('switched to {media} media to {account} in room {session.room.uri}'.format(account=videoroom_session.room[videoroom_session.publisher_id].account.id, session=videoroom_session, media=media)) def _RH_videoroom_message(self, request): try: videoroom_session = self.videoroom_sessions[request.session] except KeyError: raise APIError('Unknown room session: {request.session}'.format(request=request)) content_type = request.content_type content = request.content if content_type.startswith('text') else request.content.encode('latin1') message_id = request.message_id videoroom_session.chat_handler.send_message(message_id, content, content_type) def _RH_videoroom_composing_indication(self, request): try: videoroom_session = self.videoroom_sessions[request.session] except KeyError: raise APIError('Unknown room session: {request.session}'.format(request=request)) videoroom_session.chat_handler.send_composing_indication(request.state, request.refresh) def _RH_videoroom_mute_audio_participants(self, request): try: videoroom_session = self.videoroom_sessions[request.session] except KeyError: raise APIError('Unknown room session: {request.session}'.format(request=request)) videoroom = videoroom_session.room for session in videoroom: session.owner.send(sylkrtc.VideoroomMuteAudioEvent(session=session.id, originator=request.session)) def _RH_videoroom_toggle_hand(self, request): try: videoroom_session = self.videoroom_sessions[request.session] except KeyError: raise APIError('Unknown room session: {request.session}'.format(request=request)) videoroom = videoroom_session.room if request.session_id: request_session = request.session_id else: request_session = request.session videoroom.raised_hands = request_session for session in videoroom: session.owner.send(sylkrtc.VideoroomRaisedHandsEvent(session=session.id, raised_hands=videoroom.raised_hands)) # Event handlers def _EH_janus_sip(self, event): if isinstance(event, janus.PluginEvent): event_id = event.plugindata.data.__id__ try: handler = getattr(self, '_EH_janus_' + '_'.join(event_id)) except AttributeError: self.log.warning('unhandled Janus SIP event: {event_name}'.format(event_name=event_id[-1])) else: self.log.debug('janus SIP event: {event_name} (handle_id={event.sender})'.format(event=event, event_name=event_id[-1])) handler(event) else: # janus.CoreEvent try: handler = getattr(self, '_EH_janus_sip_' + event.janus) except AttributeError: self.log.warning('unhandled Janus SIP event: {event.janus}'.format(event=event)) else: self.log.debug('janus SIP event: {event.janus} (handle_id={event.sender})'.format(event=event)) handler(event) def _EH_janus_sip_error(self, event): # fixme: implement error handling self.log.error('got SIP error event: {}'.format(event.__data__)) handle_id = event.sender if handle_id in self.sip_sessions: pass # this is a session related event elif handle_id in self.account_handles_map: pass # this is an account related event def _EH_janus_sip_webrtcup(self, event): try: session_info = self.sip_sessions[event.sender] except KeyError: self.log.warning('could not find SIP session with handle ID {event.sender} for webrtcup event'.format(event=event)) return session_info.state = 'established' self.send(sylkrtc.SessionEstablishedEvent(session=session_info.id)) self.log.info('{session.direction} session {session.id} established'.format(session=session_info)) def _EH_janus_sip_hangup(self, event): try: session_info = self.sip_sessions[event.sender] except KeyError: return if session_info.state != 'terminated': session_info.state = 'terminated' reason = event.reason or 'unspecified reason' self.send(sylkrtc.SessionTerminatedEvent(session=session_info.id, reason=reason)) self.log.info('{session.direction} session {session.id} terminated ({reason})'.format(session=session_info, reason=reason)) self._cleanup_session(session_info) def _EH_janus_sip_slowlink(self, event): try: session_info = self.sip_sessions[event.sender] except KeyError: self.log.warning('could not find SIP session with handle ID {event.sender} for slowlink event'.format(event=event)) return if event.uplink: # uplink is from janus' point of view if not session_info.slow_download: self.log.debug('poor download connectivity for session {session.id}'.format(session=session_info)) session_info.slow_download = True else: if not session_info.slow_upload: self.log.debug('poor upload connectivity for session {session.id}'.format(session=session_info)) session_info.slow_upload = True def _EH_janus_sip_media(self, event): pass def _EH_janus_sip_detached(self, event): pass def _EH_janus_sip_event_registering(self, event): try: account_info = self.account_handles_map[event.sender] except KeyError: self.log.warning('could not find account with handle ID {event.sender} for registering event'.format(event=event)) return if account_info.registration_state != 'registering': account_info.registration_state = 'registering' self.send(sylkrtc.AccountRegisteringEvent(account=account_info.id)) def _EH_janus_sip_event_registered(self, event): if event.sender in self.sip_sessions: # skip 'registered' events from outgoing session handles return try: account_info = self.account_handles_map[event.sender] except KeyError: self.log.warning('could not find account with handle ID {event.sender} for registered event'.format(event=event)) return if account_info.registration_state != 'registered': account_info.registration_state = 'registered' self.send(sylkrtc.AccountRegisteredEvent(account=account_info.id)) self.log.info('registered') storage = MessageStorage() storage.add_account(account=account_info.id) def _EH_janus_sip_event_registration_failed(self, event): try: account_info = self.account_handles_map[event.sender] except KeyError: self.log.warning('could not find account with handle ID {event.sender} for registration failed event'.format(event=event)) return if account_info.registration_state != 'failed': account_info.registration_state = 'failed' reason = '{result.code} {result.reason}'.format(result=event.plugindata.data.result) self.send(sylkrtc.AccountRegistrationFailedEvent(account=account_info.id, reason=reason)) self.log.info('registration failed: {reason}'.format(reason=reason)) def _EH_janus_sip_event_incomingcall(self, event): try: account_info = self.account_handles_map[event.sender] except KeyError: self.log.warning('could not find account with handle ID {event.sender} for incoming call event'.format(event=event)) return assert event.jsep is not None data = event.plugindata.data.result # type: janus.SIPResultIncomingCall call_id = event.plugindata.data.call_id originator = sylkrtc.SIPIdentity(uri=data.username, display_name=data.displayname) headers = {'headers': data.headers} if data.headers else {} session = SIPSessionInfo(self._callid_to_uuid(call_id)) session.janus_handle = account_info.janus_handle session.init_incoming(account_info, originator.uri, originator.display_name) self.sip_sessions.add(session) self.send(sylkrtc.AccountIncomingSessionEvent(account=account_info.id, session=session.id, originator=originator, sdp=event.jsep.sdp, call_id=call_id, **headers)) self.log.info('incoming session {session.id} from {session.remote_identity.uri!s}'.format(session=session)) def _EH_janus_sip_event_missed_call(self, event): try: account_info = self.account_handles_map[event.sender] except KeyError: self.log.warning('could not find account with handle ID {event.sender} for missed call event'.format(event=event)) return data = event.plugindata.data.result # type: janus.SIPResultMissedCall originator = sylkrtc.SIPIdentity(uri=data.caller, display_name=data.displayname) self.send(sylkrtc.AccountMissedSessionEvent(account=account_info.id, originator=originator)) self.log.info('missed incoming call from {originator.uri}'.format(originator=originator)) def _EH_janus_sip_event_calling(self, event): try: session_info = self.sip_sessions[event.sender] except KeyError: self.log.warning('could not find SIP session with handle ID {event.sender} for calling event'.format(event=event)) return session_info.state = 'progress' self.send(sylkrtc.SessionProgressEvent(session=session_info.id)) self.log.debug('{session.direction} session {session.id} state: {session.state}'.format(session=session_info)) def _EH_janus_sip_event_accepted(self, event): try: session_info = self.sip_sessions[event.sender] except KeyError: self.log.warning('could not find SIP session with handle ID {event.sender} for accepted event'.format(event=event)) return if session_info.state == 'established' or session_info.state == 'early_media': # We had early media session_info.state = 'accepted' self.send(sylkrtc.SessionAcceptedEvent(session=session_info.id)) self.log.debug('{session.direction} session {session.id} state: {session.state}'.format(session=session_info)) return session_info.state = 'accepted' if session_info.direction == 'outgoing': assert event.jsep is not None data = event.plugindata.data.result # type: janus.SIPResultAccepted headers = {'headers': data.headers} if data.headers else {} self.send(sylkrtc.SessionAcceptedEvent(session=session_info.id, sdp=event.jsep.sdp, call_id=event.plugindata.data.call_id, **headers)) else: self.send(sylkrtc.SessionAcceptedEvent(session=session_info.id)) self.log.debug('{session.direction} session {session.id} state: {session.state}'.format(session=session_info)) def _EH_janus_sip_event_hangup(self, event): try: session_info = self.sip_sessions[event.sender] except KeyError: self.log.warning('could not find SIP session with handle ID {event.sender} for hangup event'.format(event=event)) return if session_info.state != 'terminated': session_info.state = 'terminated' data = event.plugindata.data.result # type: janus.SIPResultHangup reason = '{0.code} {0.reason}'.format(data) self.send(sylkrtc.SessionTerminatedEvent(session=session_info.id, reason=reason)) if session_info.direction == 'incoming' and data.code == 487: # incoming call was cancelled -> missed self.send(sylkrtc.AccountMissedSessionEvent(account=session_info.account.id, originator=session_info.remote_identity.__dict__)) if data.code >= 300: self.log.info('{session.direction} session {session.id} terminated ({reason})'.format(session=session_info, reason=reason)) else: self.log.info('{session.direction} session {session.id} terminated'.format(session=session_info)) self._cleanup_session(session_info) def _EH_janus_sip_event_declining(self, event): pass def _EH_janus_sip_event_hangingup(self, event): pass def _EH_janus_sip_event_proceeding(self, event): pass def _EH_janus_sip_event_progress(self, event): if (event.jsep): try: session_info = self.sip_sessions[event.sender] except KeyError: self.log.warning('could not find SIP session with handle ID {event.sender} for progress event'.format(event=event)) return session_info.state = 'early_media' self.log.info('{session.direction} session {session.id} has early media'.format(session=session_info)) self.send(sylkrtc.SessionEarlyMediaEvent(session=session_info.id, sdp=event.jsep.sdp, call_id=event.plugindata.data.call_id)) self.log.debug('{session.direction} session {session.id} state: {session.state}'.format(session=session_info)) def _EH_janus_sip_event_ringing(self, event): pass def _EH_janus_sip_event_message(self, event): data = event.plugindata.data.result # type: janus.SIPResultMessage try: session_info = self.sip_sessions[event.sender] except KeyError: return if not event.plugindata.data.call_id: return cpim_message = None if data.content_type in ("application/im-iscomposing+xml", "text/pgp-public-key"): return elif data.content_type == "message/cpim": try: content = data.content if isinstance(data.content, str) else data.content.decode('latin1') cpim_message = CPIMPayload.decode(content.encode('utf-8')) except CPIMParserError: self.log.info('message rejected: CPIM parse error') return else: body = cpim_message.content if isinstance(cpim_message.content, str) else cpim_message.content.decode() content_type = cpim_message.content_type sender = cpim_message.sender or FromHeader(SIPURI.parse('{}'.format(data.sender)), data.displayname) disposition = next(([item.strip() for item in header.value.split(',')] for header in cpim_message.additional_headers if header.name == 'Disposition-Notification'), None) message_id = next((header.value for header in cpim_message.additional_headers if header.name == 'Message-ID'), None) else: body = data.content content_type = data.content_type sender = FromHeader(SIPURI.parse('{}'.format(data.sender)), data.displayname) disposition = None message_id = str(uuid.uuid4()) timestamp = str(cpim_message.timestamp) if cpim_message is not None and cpim_message.timestamp is not None else str(ISOTimestamp.now()) sender = sylkrtc.SIPIdentity(uri=str(sender.uri), display_name=sender.display_name) if content_type in ("application/im-iscomposing+xml", "text/pgp-public-key"): return if content_type == IMDNDocument.content_type: document = IMDNDocument.parse(body) imdn_message_id = document.message_id.value imdn_status = document.notification.status.__str__() imdn_datetime = document.datetime.__str__() self.log.info('received in dialog IMDN message ({status}) from: {originator.uri}'.format(status=imdn_status, originator=sender)) self.send(sylkrtc.SessionMessageDispositionNotificationEvent(session=session_info.id, state=imdn_status, message_id=imdn_message_id, message_timestamp=imdn_datetime, timestamp=timestamp, code=200, reason='')) else: self.log.info('received n dialog message ({content_type}) from: {originator.uri}'.format(content_type=content_type, originator=sender)) self.send(sylkrtc.SessionMessageEvent(session=session_info.id, sender=sender, content=body, content_type=content_type, timestamp=timestamp, disposition_notification=disposition, message_id=message_id)) def _EH_janus_sip_event_messagesent(self, event): pass def _EH_janus_sip_event_messagedelivery(self, event): try: session_info = self.sip_sessions[event.sender] except KeyError: self.log.warning('could not find SIP session with handle ID {event.sender} for delivery event'.format(event=event)) return data = event.plugindata.data.result message_id, content, content_type = session_info._message_queue.popleft() body = CPIMPayload.decode(content) timestamp = body.timestamp if data.code < 300: self.log.info('in dialog message was delivered to remote party: %s', data.reason) state = 'accepted' else: self.log.info('message was not delivered to remote party %s: %s', data.code, data.reason) state = 'failed' self.send(sylkrtc.SessionMessageDispositionNotificationEvent(session=session_info.id, code=data.code, reason=data.reason, state=state, message_id=message_id, message_timestamp=str(timestamp), timestamp=str(ISOTimestamp.now()))) def _EH_janus_videoroom(self, event): if isinstance(event, janus.PluginEvent): event_id = event.plugindata.data.__id__ try: handler = getattr(self, '_EH_janus_' + '_'.join(event_id)) except AttributeError: self.log.warning('unhandled Janus videoroom event: {event_name}'.format(event_name=event_id[-1])) else: self.log.debug('janus videoroom event: {event_name} (handle_id={event.sender})'.format(event=event, event_name=event_id[-1])) handler(event) else: # janus.CoreEvent try: handler = getattr(self, '_EH_janus_videoroom_' + event.janus) except AttributeError: self.log.warning('unhandled Janus videoroom event: {event.janus}'.format(event=event)) else: self.log.debug('janus videoroom event: {event.janus} (handle_id={event.sender})'.format(event=event)) handler(event) def _EH_janus_videoroom_error(self, event): # fixme: implement error handling self.log.error('got videoroom error event: {}'.format(event.__data__)) try: videoroom_session = self.videoroom_sessions[event.sender] except KeyError: self.log.warning('could not find room session with handle ID {event.sender} for error event'.format(event=event)) return if videoroom_session.type == 'publisher': pass else: pass def _EH_janus_videoroom_webrtcup(self, event): try: videoroom_session = self.videoroom_sessions[event.sender] except KeyError: self.log.warning('could not find room session with handle ID {event.sender} for webrtcup event'.format(event=event)) return if videoroom_session.type == 'publisher': self.log.debug('media published to room {session.room.uri}'.format(session=videoroom_session)) self.send(sylkrtc.VideoroomSessionEstablishedEvent(session=videoroom_session.id)) else: self.send(sylkrtc.VideoroomFeedEstablishedEvent(session=videoroom_session.parent_session.id, feed=videoroom_session.id)) def _EH_janus_videoroom_hangup(self, event): try: videoroom_session = self.videoroom_sessions[event.sender] except KeyError: return reactor.callLater(2, call_in_green_thread, self._cleanup_videoroom_session, videoroom_session) self.log.debug('session with room {session.room.uri} ended'.format(session=videoroom_session)) def _EH_janus_videoroom_slowlink(self, event): try: videoroom_session = self.videoroom_sessions[event.sender] except KeyError: self.log.warning('could not find room session with handle ID {event.sender} for slowlink event'.format(event=event)) return if event.uplink: # uplink is from janus' point of view if not videoroom_session.slow_download: self.log.debug('poor download connectivity to room {session.room.uri} with session {session.id}'.format(session=videoroom_session)) videoroom_session.slow_download = True else: if not videoroom_session.slow_upload: self.log.debug('poor upload connectivity to room {session.room.uri} with session {session.id}'.format(session=videoroom_session)) videoroom_session.slow_upload = True def _EH_janus_videoroom_media(self, event): pass def _EH_janus_videoroom_detached(self, event): pass def _EH_janus_videoroom_joined(self, event): # send when a publisher successfully joined a room try: videoroom_session = self.videoroom_sessions[event.sender] except KeyError: self.log.warning('could not find room session with handle ID {event.sender} for joined event'.format(event=event)) return if ('m=video' in event.jsep.sdp and 'm=audio' in event.jsep.sdp): media = 'audio/video' elif ('m=video' in event.jsep.sdp): media = 'video only' elif ('m=audio' in event.jsep.sdp): media = 'audio only' else: media = 'unknown' self.log.info('joined room {session.room.uri} with {media}'.format(session=videoroom_session, media=media)) self.log.debug('joined room {session.room.uri} with session {session.id}'.format(session=videoroom_session)) data = event.plugindata.data # type: janus.VideoroomJoined videoroom_session.publisher_id = data.id room = videoroom_session.room assert event.jsep is not None self.send(sylkrtc.VideoroomSessionAcceptedEvent(session=videoroom_session.id, sdp=event.jsep.sdp, audio=room.audio, video=room.video)) # send information about existing publishers publishers = [] for publisher in data.publishers: # type: janus.VideoroomPublisher try: publisher_session = room[publisher.id] except KeyError: self.log.warning('could not find matching session for publisher {publisher.id} during joined event'.format(publisher=publisher)) else: publishers.append(dict(id=publisher_session.id, uri=publisher_session.account.id, display_name=publisher.display or '')) self.send(sylkrtc.VideoroomInitialPublishersEvent(session=videoroom_session.id, publishers=publishers)) room.add(videoroom_session) # adding the session to the room might also trigger sending an event with the active participants which must be sent last def _EH_janus_videoroom_attached(self, event): try: videoroom_session = self.videoroom_sessions[event.sender] except KeyError: self.log.warning('could not find room session with handle ID {event.sender} for attached event'.format(event=event)) return # get the session which originated the subscription base_session = videoroom_session.parent_session assert base_session is not None assert event.jsep is not None and event.jsep.type == 'offer' if ('m=video' in event.jsep.sdp and 'm=audio' in event.jsep.sdp): media = 'audio/video' elif ('m=video' in event.jsep.sdp): media = 'video only' elif ('m=audio' in event.jsep.sdp): media = 'audio only' else: media = 'unknown' self.log.debug('{media} media proposed to room {session.room.uri}'.format(session=videoroom_session, media=media)) self.send(sylkrtc.VideoroomFeedAttachedEvent(session=base_session.id, feed=videoroom_session.id, sdp=event.jsep.sdp)) def _EH_janus_videoroom_slow_link(self, event): pass def _EH_janus_videoroom_updated(self, event): pass def _EH_janus_videoroom_event_publishers(self, event): try: videoroom_session = self.videoroom_sessions[event.sender] except KeyError: self.log.warning('could not find room session with handle ID {event.sender} for publishers event'.format(event=event)) return room = videoroom_session.room # send information about new publishers publishers = [] for publisher in event.plugindata.data.publishers: # type: janus.VideoroomPublisher try: publisher_session = room[publisher.id] except KeyError: self.log.warning('could not find matching session for publisher {publisher.id} during publishers event'.format(publisher=publisher)) continue publishers.append(dict(id=publisher_session.id, uri=publisher_session.account.id, display_name=publisher.display or '')) self.send(sylkrtc.VideoroomPublishersJoinedEvent(session=videoroom_session.id, publishers=publishers)) def _EH_janus_videoroom_event_leaving(self, event): # this is a publisher publisher_id = event.plugindata.data.leaving # publisher_id == 'ok' when the event is about ourselves leaving the room, else the publisher's janus ID try: base_session = self.videoroom_sessions[event.sender] except KeyError: if publisher_id != 'ok': self.log.warning('could not find room session with handle ID {event.sender} for leaving event'.format(event=event)) return if publisher_id == 'ok': self.log.info('left room {session.room.uri}'.format(session=base_session)) self.log.debug('left room {session.room.uri} with session {session.id}'.format(session=base_session)) self._cleanup_videoroom_session(base_session) return try: publisher_session = base_session.feeds.pop(publisher_id) except KeyError: return self.send(sylkrtc.VideoroomPublishersLeftEvent(session=base_session.id, publishers=[publisher_session.id])) def _EH_janus_videoroom_event_left(self, event): # this is a subscriber try: videoroom_session = self.videoroom_sessions[event.sender] except KeyError: pass else: self._cleanup_videoroom_session(videoroom_session) def _EH_janus_videoroom_event_configured(self, event): pass def _EH_janus_videoroom_event_started(self, event): pass def _EH_janus_videoroom_event_unpublished(self, event): pass # Notification handlers def _NH_ChatSessionGotMessage(self, notification): session = notification.sender.sylk_session # type: VideoroomSessionInfo message = notification.data.message sender = sylkrtc.SIPIdentity(uri=str(message.sender.uri), display_name=message.sender.display_name) content = message.content if isinstance(message.content, str) else message.content.decode('latin1') # preserve binary data for transmitting over JSON if any(header.name == 'Message-Type' and header.value == 'status' and header.namespace == 'urn:ag-projects:xml:ns:cpim' for header in message.additional_headers): message_type = 'status' else: message_type = 'normal' self.send(sylkrtc.VideoroomMessageEvent(session=session.id, content=content, content_type=message.content_type, sender=sender, timestamp=str(message.timestamp), type=message_type)) def _NH_ChatSessionGotComposingIndication(self, notification): session = notification.sender.sylk_session # type: VideoroomSessionInfo composing = notification.data sender = sylkrtc.SIPIdentity(uri=str(composing.sender.uri), display_name=composing.sender.display_name) self.send(sylkrtc.VideoroomComposingIndicationEvent(session=session.id, state=composing.state, refresh=composing.refresh, content_type=composing.content_type, sender=sender)) def _NH_ChatSessionDidDeliverMessage(self, notification): session = notification.sender.sylk_session # type: VideoroomSessionInfo data = notification.data self.send(sylkrtc.VideoroomMessageDeliveryEvent(session=session.id, delivered=True, message_id=data.message_id, code=data.code, reason=data.reason)) def _NH_ChatSessionDidNotDeliverMessage(self, notification): session = notification.sender.sylk_session # type: VideoroomSessionInfo data = notification.data self.send(sylkrtc.VideoroomMessageDeliveryEvent(session=session.id, delivered=False, message_id=data.message_id, code=data.code, reason=data.reason)) def _NH_SIPApplicationGotAccountDispositionNotification(self, notification): try: self.accounts_map[notification.sender] except KeyError: return message = notification.data.message self.log.info('received IMDN message ({status}) from: {originator.uri}'.format(status=message.state, originator=notification.data.sender)) self.send(message) def _NH_SIPApplicationGotAccountMessage(self, notification): try: self.accounts_map[notification.sender] except KeyError: return message = notification.data self.log.info('received message ({content_type}) from: {originator.uri}'.format(content_type=message.content_type, originator=message.sender)) self.send(message) def _NH_SIPApplicationGotOutgoingAccountMessage(self, notification): try: self.accounts_map[notification.sender] except KeyError: return message = notification.data self.log.info('received outgoing message ({content_type}) to {destination}'.format(content_type=message.content.content_type, destination=message.content.uri)) self.send(message) def _NH_SIPApplicationGotAccountRemoveMessage(self, notification): try: self.accounts_map[notification.sender] except KeyError: return message = notification.data self.send(message) def _NH_SIPMessageDidSucceed(self, notification): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=notification.sender) self.log.info('message was accepted by remote party') data = notification.data body = CPIMPayload.decode(notification.sender.body) message_id = next((header.value for header in body.additional_headers if header.name == 'Message-ID'), None) account_info = self.accounts_map['{}@{}'.format(body.sender.uri.user.decode('utf-8'), body.sender.uri.host.decode('utf-8'))] timestamp = body.timestamp if body.content_type != IMDNDocument.content_type: storage = MessageStorage() storage.update(account=account_info.id, state='accepted', message_id=message_id) event = sylkrtc.AccountDispositionNotificationEvent(account=account_info.id, state='accepted', message_id=message_id, message_timestamp=str(timestamp), code=data.code, reason=data.reason, timestamp=str(ISOTimestamp.now())) self.send(event) self._fork_event_to_online_accounts(account_info, event) def _NH_SIPMessageDidFail(self, notification): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=notification.sender) data = notification.data body = CPIMPayload.decode(notification.sender.body) reason = data.reason.decode() if isinstance(data.reason, bytes) else data.reason callid = data.headers.get('Call-ID', Null).body if hasattr(data, 'headers') else None self.log.warning('could not deliver message to %s: %d %s (%s)' % (', '.join(([str(item.uri) for item in body.recipients])), data.code, reason, callid)) message_id = next((header.value for header in body.additional_headers if header.name == 'Message-ID'), None) account = '{}@{}'.format(body.sender.uri.user.decode('utf-8'), body.sender.uri.host.decode('utf-8')) timestamp = body.timestamp if body.content_type != IMDNDocument.content_type: storage = MessageStorage() storage.update(account=account, state='failed', message_id=message_id) event = sylkrtc.AccountDispositionNotificationEvent(account=account, state='failed', message_id=message_id, code=data.code, reason=reason, message_timestamp=str(timestamp), timestamp=str(ISOTimestamp.now())) self.send(event) try: account_info = self.accounts_map[account] except KeyError: pass else: self._fork_event_to_online_accounts(account_info, event) # noinspection PyPep8Naming @implementer(IObserver) class VideoroomChatHandler(object): def __init__(self, session): self.sylk_session = session # type: VideoroomSessionInfo self.sip_session = None # type: Optional[Session] self.chat_stream = None self._started = False self._ended = False self._message_queue = deque() @property def account(self): return self.sylk_session.account @property def room(self): return self.sylk_session.room @run_in_green_thread def start(self): if self._started: return self._started = True notification_center = NotificationCenter() from_uri = SIPURI.parse(self.account.uri) to_uri = SIPURI.parse('sip:{}'.format(self.room.uri)) to_uri.host = to_uri.host.replace(b'videoconference', b'conference', 1) # TODO: find a way to define this credentials = Credentials(username=from_uri.user, password=self.account.password.encode('utf-8'), digest=True) sip_account = DefaultAccount() sip_settings = SIPSimpleSettings() if sip_account.sip.outbound_proxy is not None: uri = SIPURI(host=sip_account.sip.outbound_proxy.host, port=sip_account.sip.outbound_proxy.port, parameters={'transport': sip_account.sip.outbound_proxy.transport}) else: uri = to_uri lookup = DNSLookup() try: route = lookup.lookup_sip_proxy(uri, sip_settings.sip.transport_list).wait()[0] except (DNSLookupError, IndexError): self.end() self.room.log.error('DNS lookup for SIP proxy for {} failed'.format(uri)) self.room.log.error('chat session for {} failed: DNS lookup error'.format(self.account.id)) notification_center.post_notification('ChatSessionDidFail', sender=self, data=NotificationData(originator='local', code=0, reason=None, failure_reason='DNS lookup error')) return if self._ended: # end was called during DNS lookup self.room.log.debug('chat session for {} ended'.format(self.account.id)) notification_center.post_notification('ChatSessionDidEnd', sender=self) return self.sip_session = Session(sip_account) self.chat_stream = MediaStreamRegistry.ChatStream() notification_center.add_observer(self, sender=self.sip_session) notification_center.add_observer(self, sender=self.chat_stream) self.room.log.debug('chat {} starting at {}'.format(to_uri, route)) self.sip_session.connect(FromHeader(from_uri, self.account.display_name), ToHeader(to_uri), route=route, streams=[self.chat_stream], credentials=credentials) @run_in_twisted_thread def end(self): if self._ended: return notification_center = NotificationCenter() if self.sip_session is not None: notification_center.remove_observer(self, sender=self.sip_session) notification_center.remove_observer(self, sender=self.chat_stream) self.sip_session.end() self.sip_session = None self.chat_stream = None self.room.log.debug('chat session for {} ended'.format(self.account.id)) notification_center.post_notification('ChatSessionDidEnd', sender=self) while self._message_queue: message_id, content, content_type = self._message_queue.popleft() data = NotificationData(message_id=message_id, message=None, code=0, reason='Chat session ended') notification_center.post_notification('ChatSessionDidNotDeliverMessage', sender=self, data=data) self._ended = True @run_in_twisted_thread def send_message(self, message_id, content, content_type='text/plain'): if self._ended: notification_center = NotificationCenter() data = NotificationData(message_id=message_id, message=None, code=0, reason='Chat session ended') notification_center.post_notification('ChatSessionDidNotDeliverMessage', sender=self, data=data) else: self._message_queue.append((message_id, content, content_type)) if self.chat_stream is not None: self._send_queued_messages() @run_in_twisted_thread def send_composing_indication(self, state, refresh=None): if self.chat_stream is not None: self.chat_stream.send_composing_indication(state, refresh=refresh) def _send_queued_messages(self): while self._message_queue: message_id, content, content_type = self._message_queue.popleft() self.chat_stream.send_message(content, content_type, message_id=message_id) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidStart(self, notification): self.room.log.debug('chat session for {} started'.format(self.account.id)) notification.center.post_notification('ChatSessionDidStart', sender=self) self._send_queued_messages() def _NH_SIPSessionDidEnd(self, notification): notification.center.remove_observer(self, sender=self.sip_session) notification.center.remove_observer(self, sender=self.chat_stream) self.sip_session = None self.chat_stream = None self.end() self.room.log.debug('chat session for {} ended'.format(self.account.id)) notification.center.post_notification('ChatSessionDidEnd', sender=self, data=notification.data) def _NH_SIPSessionDidFail(self, notification): notification.center.remove_observer(self, sender=self.sip_session) notification.center.remove_observer(self, sender=self.chat_stream) self.sip_session = None self.chat_stream = None self.end() self.room.log.error('chat session for {} failed: {}'.format(self.account.id, notification.data.failure_reason)) notification.center.post_notification('ChatSessionDidFail', sender=self, data=notification.data) # noinspection PyUnusedLocal def _NH_SIPSessionNewProposal(self, notification): self.sip_session.reject_proposal() def _NH_SIPSessionTransferNewIncoming(self, notification): # sylkserver's SIP Session class doesn't implement the transfer API # self.sip_session.reject_transfer(403) pass def _NH_ChatStreamGotMessage(self, notification): self.chat_stream.msrp_session.send_report(notification.data.chunk, 200, 'OK') notification.center.post_notification('ChatSessionGotMessage', sender=self, data=notification.data) def _NH_ChatStreamGotComposingIndication(self, notification): notification.center.post_notification('ChatSessionGotComposingIndication', sender=self, data=notification.data) def _NH_ChatStreamDidSendMessage(self, notification): notification.center.post_notification('ChatSessionDidSendMessage', sender=self, data=notification.data) def _NH_ChatStreamDidDeliverMessage(self, notification): notification.center.post_notification('ChatSessionDidDeliverMessage', sender=self, data=notification.data) def _NH_ChatStreamDidNotDeliverMessage(self, notification): notification.center.post_notification('ChatSessionDidNotDeliverMessage', sender=self, data=notification.data) diff --git a/sylk/applications/webrtcgateway/models/sylkrtc.py b/sylk/applications/webrtcgateway/models/sylkrtc.py index 8962d29..336cfd8 100644 --- a/sylk/applications/webrtcgateway/models/sylkrtc.py +++ b/sylk/applications/webrtcgateway/models/sylkrtc.py @@ -1,650 +1,651 @@ from application.python import subclasses from .jsonobjects import BooleanProperty, IntegerProperty, StringProperty, ArrayProperty, ObjectProperty, FixedValueProperty, LimitedChoiceProperty, AbstractObjectProperty, AbstractProperty from .jsonobjects import JSONObject, JSONArray, StringArray, CompositeValidator from .validators import AORValidator, DisplayNameValidator, LengthValidator, UniqueItemsValidator from sipsimple.util import ISOTimestamp # Base models (these are abstract and should not be used directly) class SylkRTCRequestBase(JSONObject): transaction = StringProperty() class SylkRTCResponseBase(JSONObject): transaction = StringProperty() class AccountRequestBase(SylkRTCRequestBase): account = StringProperty(validator=AORValidator()) class SessionRequestBase(SylkRTCRequestBase): session = StringProperty() class VideoroomRequestBase(SylkRTCRequestBase): session = StringProperty() class AccountEventBase(JSONObject): sylkrtc = FixedValueProperty('account-event') account = StringProperty(validator=AORValidator()) class SessionEventBase(JSONObject): sylkrtc = FixedValueProperty('session-event') session = StringProperty() class VideoroomEventBase(JSONObject): sylkrtc = FixedValueProperty('videoroom-event') session = StringProperty() class AccountRegistrationStateEvent(AccountEventBase): event = FixedValueProperty('registration-state') class SessionStateEvent(SessionEventBase): event = FixedValueProperty('state') class VideoroomSessionStateEvent(VideoroomEventBase): event = FixedValueProperty('session-state') # Miscellaneous models class Header(JSONObject): name = StringProperty() value = StringProperty() class Headers(JSONArray): item_type = Header class SIPIdentity(JSONObject): uri = StringProperty(validator=AORValidator()) display_name = StringProperty(optional=True, validator=DisplayNameValidator()) class ICECandidate(JSONObject): candidate = StringProperty() sdpMLineIndex = IntegerProperty() sdpMid = StringProperty() class ICECandidates(JSONArray): item_type = ICECandidate class AORList(StringArray): list_validator = UniqueItemsValidator() item_validator = AORValidator() class VideoroomPublisher(JSONObject): id = StringProperty() uri = StringProperty(validator=AORValidator()) display_name = StringProperty(optional=True) class VideoroomPublishers(JSONArray): item_type = VideoroomPublisher class VideoroomActiveParticipants(StringArray): list_validator = CompositeValidator(UniqueItemsValidator(), LengthValidator(maximum=2)) class VideoroomSessionOptions(JSONObject): audio = BooleanProperty(optional=True) video = BooleanProperty(optional=True) bitrate = IntegerProperty(optional=True) class VideoroomRaisedHands(StringArray): list_validator = UniqueItemsValidator() class SharedFile(JSONObject): filename = StringProperty() filesize = IntegerProperty() uploader = ObjectProperty(SIPIdentity) # type: SIPIdentity session = StringProperty() class SharedFiles(JSONArray): item_type = SharedFile class TransferredFile(JSONObject): filename = StringProperty() filesize = IntegerProperty() sender = ObjectProperty(SIPIdentity) # type: SIPIdentity receiver = ObjectProperty(SIPIdentity) transfer_id = StringProperty() prefix = StringProperty() path = StringProperty() timestamp = StringProperty() until = StringProperty(optional=True) url = StringProperty(optional=True) filetype = StringProperty(optional=True) hash = StringProperty(optional=True) class FileTransferMessage(JSONObject): filename = StringProperty() filesize = IntegerProperty() sender = ObjectProperty(SIPIdentity) # type: SIPIdentity receiver = ObjectProperty(SIPIdentity) transfer_id = StringProperty() timestamp = StringProperty() until = StringProperty(optional=True) url = StringProperty(optional=True) filetype = StringProperty(optional=True) hash = StringProperty(optional=True) class DispositionNotifications(StringArray): list_validator = UniqueItemsValidator() class Message(JSONObject): contact = StringProperty(validator=AORValidator()) timestamp = StringProperty() disposition = ArrayProperty(DispositionNotifications, optional=True) message_id = StringProperty() content_type = StringProperty() content = StringProperty() direction = StringProperty(optional=True) state = LimitedChoiceProperty(['delivered', 'failed', 'displayed', 'forbidden', 'error', 'accepted', 'pending', 'received'], optional=True) def __init__(self, **kw): if 'msg_timestamp' in kw: kw['timestamp'] = str(ISOTimestamp(kw['msg_timestamp'])) del kw['msg_timestamp'] super(Message, self).__init__(**kw) class ContactMessages(JSONArray): item_type = Message class MessageHistoryData(JSONObject): account = StringProperty(validator=AORValidator()) messages = ArrayProperty(ContactMessages) class AccountMessageRemoveEventData(JSONObject): contact = StringProperty() message_id = StringProperty() class AccountMarkConversationReadEventData(JSONObject): contact = StringProperty() class AccountConversationRemoveEventData(JSONObject): contact = StringProperty() class AccountDispositionNotificationEventData(JSONObject): message_id = StringProperty() state = LimitedChoiceProperty(['accepted', 'delivered', 'displayed', 'failed', 'processed', 'stored', 'forbidden', 'error']) message_timstamp = StringProperty() code = IntegerProperty() reason = StringProperty() class IncomingHeaderPrefixes(StringArray): list_validator = UniqueItemsValidator() # Response models class AckResponse(SylkRTCResponseBase): sylkrtc = FixedValueProperty('ack') class ErrorResponse(SylkRTCResponseBase): sylkrtc = FixedValueProperty('error') error = StringProperty() # Connection events class ReadyEvent(JSONObject): sylkrtc = FixedValueProperty('ready-event') class LookupPublicKeyEvent(JSONObject): sylkrtc = FixedValueProperty('lookup-public-key-event') uri = StringProperty(validator=AORValidator()) public_key = StringProperty(optional=True) # Account events class AccountIncomingSessionEvent(AccountEventBase): event = FixedValueProperty('incoming-session') session = StringProperty() originator = ObjectProperty(SIPIdentity) # type: SIPIdentity sdp = StringProperty() call_id = StringProperty() headers = AbstractProperty(optional=True) class AccountMissedSessionEvent(AccountEventBase): event = FixedValueProperty('missed-session') originator = ObjectProperty(SIPIdentity) # type: SIPIdentity class AccountConferenceInviteEvent(AccountEventBase): event = FixedValueProperty('conference-invite') room = StringProperty(validator=AORValidator()) session_id = StringProperty() originator = ObjectProperty(SIPIdentity) # type: SIPIdentity class AccountMessageEvent(AccountEventBase): event = FixedValueProperty('message') sender = ObjectProperty(SIPIdentity) # type: SIPIdentity timestamp = StringProperty() disposition_notification = ArrayProperty(DispositionNotifications, optional=True) message_id = StringProperty() content_type = StringProperty() content = StringProperty() direction = StringProperty(optional=True) class AccountDispositionNotificationEvent(AccountEventBase): event = FixedValueProperty('disposition-notification') message_id = StringProperty() message_timestamp = StringProperty() state = LimitedChoiceProperty(['accepted', 'delivered', 'displayed', 'failed', 'processed', 'stored', 'forbidden', 'error']) code = IntegerProperty() reason = StringProperty() class AccountSyncConversationsEvent(AccountEventBase): event = FixedValueProperty('sync-conversations') messages = ArrayProperty(ContactMessages) class AccountSyncEvent(AccountEventBase): event = FixedValueProperty('sync') type = StringProperty() action = StringProperty() content = AbstractObjectProperty() class AccountRegisteringEvent(AccountRegistrationStateEvent): state = FixedValueProperty('registering') class AccountRegisteredEvent(AccountRegistrationStateEvent): state = FixedValueProperty('registered') class AccountRegistrationFailedEvent(AccountRegistrationStateEvent): state = FixedValueProperty('failed') reason = StringProperty(optional=True) # Session events class SessionProgressEvent(SessionStateEvent): state = FixedValueProperty('progress') class SessionEarlyMediaEvent(SessionStateEvent): state = FixedValueProperty('early-media') sdp = StringProperty(optional=True) call_id = StringProperty(optional=True) class SessionAcceptedEvent(SessionStateEvent): state = FixedValueProperty('accepted') sdp = StringProperty(optional=True) # missing for incoming sessions call_id = StringProperty(optional=True) headers = AbstractProperty(optional=True) class SessionEstablishedEvent(SessionStateEvent): state = FixedValueProperty('established') class SessionTerminatedEvent(SessionStateEvent): state = FixedValueProperty('terminated') reason = StringProperty(optional=True) class SessionMessageEvent(SessionEventBase): event = FixedValueProperty('message') sender = ObjectProperty(SIPIdentity) # type: SIPIdentity timestamp = StringProperty() disposition_notification = ArrayProperty(DispositionNotifications, optional=True) message_id = StringProperty() content_type = StringProperty() content = StringProperty() direction = StringProperty(optional=True) class SessionMessageDispositionNotificationEvent(SessionEventBase): event = FixedValueProperty('disposition-notification') message_id = StringProperty() message_timestamp = StringProperty() state = LimitedChoiceProperty(['accepted', 'delivered', 'displayed', 'failed', 'processed', 'stored', 'forbidden', 'error']) code = IntegerProperty() reason = StringProperty() # Video room events class VideoroomConfigureEvent(VideoroomEventBase): event = FixedValueProperty('configure') originator = StringProperty() active_participants = ArrayProperty(VideoroomActiveParticipants) # type: VideoroomActiveParticipants class VideoroomSessionProgressEvent(VideoroomSessionStateEvent): state = FixedValueProperty('progress') class VideoroomSessionAcceptedEvent(VideoroomSessionStateEvent): state = FixedValueProperty('accepted') sdp = StringProperty() video = BooleanProperty(optional=True, default=True) audio = BooleanProperty(optional=True, default=True) class VideoroomSessionEstablishedEvent(VideoroomSessionStateEvent): state = FixedValueProperty('established') class VideoroomSessionTerminatedEvent(VideoroomSessionStateEvent): state = FixedValueProperty('terminated') reason = StringProperty(optional=True) class VideoroomFeedAttachedEvent(VideoroomEventBase): event = FixedValueProperty('feed-attached') feed = StringProperty() sdp = StringProperty() class VideoroomFeedEstablishedEvent(VideoroomEventBase): event = FixedValueProperty('feed-established') feed = StringProperty() class VideoroomInitialPublishersEvent(VideoroomEventBase): event = FixedValueProperty('initial-publishers') publishers = ArrayProperty(VideoroomPublishers) # type: VideoroomPublishers class VideoroomPublishersJoinedEvent(VideoroomEventBase): event = FixedValueProperty('publishers-joined') publishers = ArrayProperty(VideoroomPublishers) # type: VideoroomPublishers class VideoroomPublishersLeftEvent(VideoroomEventBase): event = FixedValueProperty('publishers-left') publishers = ArrayProperty(StringArray) # type: StringArray class VideoroomFileSharingEvent(VideoroomEventBase): event = FixedValueProperty('file-sharing') files = ArrayProperty(SharedFiles) # type: SharedFiles class VideoroomMessageEvent(VideoroomEventBase): event = FixedValueProperty('message') type = LimitedChoiceProperty(['normal', 'status']) content = StringProperty() content_type = StringProperty() sender = ObjectProperty(SIPIdentity) # type: SIPIdentity timestamp = StringProperty() class VideoroomComposingIndicationEvent(VideoroomEventBase): event = FixedValueProperty('composing-indication') state = StringProperty() refresh = IntegerProperty() content_type = StringProperty() sender = ObjectProperty(SIPIdentity) # type: SIPIdentity class VideoroomMessageDeliveryEvent(VideoroomEventBase): event = FixedValueProperty('message-delivery') message_id = StringProperty() delivered = BooleanProperty() code = IntegerProperty() reason = StringProperty() class VideoroomMuteAudioEvent(VideoroomEventBase): event = FixedValueProperty('mute-audio') originator = StringProperty() class VideoroomRaisedHandsEvent(VideoroomEventBase): event = FixedValueProperty('raised-hands') raised_hands = ArrayProperty(VideoroomRaisedHands) # Ping request model, can be used to check connectivity from client class PingRequest(SylkRTCRequestBase): sylkrtc = FixedValueProperty('ping') # Lookup Public key model class LookupPublicKeyRequest(SylkRTCRequestBase): sylkrtc = FixedValueProperty('lookup-public-key') uri = StringProperty(validator=AORValidator()) # Account request models class AccountAddRequest(AccountRequestBase): sylkrtc = FixedValueProperty('account-add') password = StringProperty(validator=LengthValidator(minimum=1, maximum=9999)) display_name = StringProperty(optional=True) user_agent = StringProperty(optional=True) incoming_header_prefixes = ArrayProperty(IncomingHeaderPrefixes, optional=True) class AccountRemoveRequest(AccountRequestBase): sylkrtc = FixedValueProperty('account-remove') class AccountRegisterRequest(AccountRequestBase): sylkrtc = FixedValueProperty('account-register') class AccountUnregisterRequest(AccountRequestBase): sylkrtc = FixedValueProperty('account-unregister') class AccountDeviceTokenRequest(AccountRequestBase): sylkrtc = FixedValueProperty('account-devicetoken') token = StringProperty() platform = StringProperty() device = StringProperty() silent = BooleanProperty(default=False) app = StringProperty() class AccountMessageRequest(AccountRequestBase): sylkrtc = FixedValueProperty('account-message') uri = StringProperty(validator=AORValidator()) message_id = StringProperty() content = StringProperty() content_type = StringProperty() timestamp = StringProperty() server_generated = BooleanProperty(optional=True) class AccountDispositionNotificationRequest(AccountRequestBase): sylkrtc = FixedValueProperty('account-disposition-notification') uri = StringProperty(validator=AORValidator()) message_id = StringProperty() state = LimitedChoiceProperty(['delivered', 'failed', 'displayed', 'forbidden', 'error']) timestamp = StringProperty() class AccountSyncConversationsRequest(AccountRequestBase): sylkrtc = FixedValueProperty('account-sync-conversations') message_id = StringProperty(optional=True) + since = StringProperty(optional=True) class AccountMarkConversationReadRequest(AccountRequestBase): sylkrtc = FixedValueProperty('account-mark-conversation-read') contact = StringProperty(validator=AORValidator()) class AccountMessageRemoveRequest(AccountRequestBase): sylkrtc = FixedValueProperty('account-remove-message') message_id = StringProperty() contact = StringProperty(validator=AORValidator()) class AccountConversationRemoveRequest(AccountRequestBase): sylkrtc = FixedValueProperty('account-remove-conversation') contact = StringProperty(validator=AORValidator()) # Session request models class SessionCreateRequest(SessionRequestBase): sylkrtc = FixedValueProperty('session-create') account = StringProperty(validator=AORValidator()) uri = StringProperty(validator=AORValidator()) sdp = StringProperty() headers = ArrayProperty(Headers, optional=True) class SessionAnswerRequest(SessionRequestBase): sylkrtc = FixedValueProperty('session-answer') sdp = StringProperty() headers = ArrayProperty(Headers, optional=True) class SessionTrickleRequest(SessionRequestBase): sylkrtc = FixedValueProperty('session-trickle') candidates = ArrayProperty(ICECandidates) # type: ICECandidates class SessionTerminateRequest(SessionRequestBase): sylkrtc = FixedValueProperty('session-terminate') class SessionMessageRequest(SessionRequestBase): sylkrtc = FixedValueProperty('session-message') message_id = StringProperty() content = StringProperty() content_type = StringProperty() timestamp = StringProperty() # Videoroom request models class VideoroomJoinRequest(VideoroomRequestBase): sylkrtc = FixedValueProperty('videoroom-join') account = StringProperty(validator=AORValidator()) uri = StringProperty(validator=AORValidator()) sdp = StringProperty() audio = BooleanProperty(optional=True, default=True) video = BooleanProperty(optional=True, default=True) class VideoroomLeaveRequest(VideoroomRequestBase): sylkrtc = FixedValueProperty('videoroom-leave') class VideoroomConfigureRequest(VideoroomRequestBase): sylkrtc = FixedValueProperty('videoroom-configure') active_participants = ArrayProperty(VideoroomActiveParticipants) # type: VideoroomActiveParticipants class VideoroomFeedAttachRequest(VideoroomRequestBase): sylkrtc = FixedValueProperty('videoroom-feed-attach') publisher = StringProperty() feed = StringProperty() class VideoroomFeedAnswerRequest(VideoroomRequestBase): sylkrtc = FixedValueProperty('videoroom-feed-answer') feed = StringProperty() sdp = StringProperty() class VideoroomFeedDetachRequest(VideoroomRequestBase): sylkrtc = FixedValueProperty('videoroom-feed-detach') feed = StringProperty() class VideoroomInviteRequest(VideoroomRequestBase): sylkrtc = FixedValueProperty('videoroom-invite') participants = ArrayProperty(AORList) # type: AORList class VideoroomSessionTrickleRequest(VideoroomRequestBase): sylkrtc = FixedValueProperty('videoroom-session-trickle') candidates = ArrayProperty(ICECandidates) # type: ICECandidates class VideoroomSessionUpdateRequest(VideoroomRequestBase): sylkrtc = FixedValueProperty('videoroom-session-update') options = ObjectProperty(VideoroomSessionOptions) # type: VideoroomSessionOptions class VideoroomMessageRequest(VideoroomRequestBase): sylkrtc = FixedValueProperty('videoroom-message') message_id = StringProperty() content = StringProperty() content_type = StringProperty() class VideoroomComposingIndicationRequest(VideoroomRequestBase): sylkrtc = FixedValueProperty('videoroom-composing-indication') state = LimitedChoiceProperty(['active', 'idle']) refresh = IntegerProperty(optional=True) class VideoroomMuteAudioParticipantsRequest(VideoroomRequestBase): sylkrtc = FixedValueProperty('videoroom-mute-audio-participants') class VideoroomToggleHandRequest(VideoroomRequestBase): sylkrtc = FixedValueProperty('videoroom-toggle-hand') session_id = StringProperty(optional=True) # SylkRTC request to model mapping class ProtocolError(Exception): pass class SylkRTCRequest(object): __classmap__ = {cls.sylkrtc.value: cls for cls in subclasses(SylkRTCRequestBase) if hasattr(cls, 'sylkrtc')} @classmethod def from_message(cls, message): try: request_type = message['sylkrtc'] except KeyError: raise ProtocolError('could not get WebSocket message type') try: request_class = cls.__classmap__[request_type] except KeyError: raise ProtocolError('unknown WebSocket request: %s' % request_type) return request_class(**message) diff --git a/sylk/applications/webrtcgateway/storage.py b/sylk/applications/webrtcgateway/storage.py index d9c0275..adcde6a 100644 --- a/sylk/applications/webrtcgateway/storage.py +++ b/sylk/applications/webrtcgateway/storage.py @@ -1,736 +1,750 @@ import datetime import json import pickle as pickle import os from application.python.types import Singleton from application.system import makedirs from collections import defaultdict from sipsimple.threading import run_in_thread from sipsimple.util import ISOTimestamp from shutil import rmtree from twisted.internet import defer from types import SimpleNamespace from sylk.configuration import ServerConfig from .configuration import CassandraConfig from .datatypes import FileTransferData from .errors import StorageError from .logger import log __all__ = 'TokenStorage', # TODO: Maybe add some more metadata like the modification date so we know when a token was refreshed, # and thus it's ok to scrap it after a reasonable amount of time. CASSANDRA_MODULES_AVAILABLE = False try: from cassandra.cqlengine import columns, connection except ImportError: pass else: try: from cassandra.cqlengine.models import Model except ImportError: pass else: CASSANDRA_MODULES_AVAILABLE = True from cassandra import InvalidRequest from cassandra.cqlengine import CQLEngineException from cassandra.cqlengine.query import LWTException from cassandra.cluster import NoHostAvailable from cassandra.policies import DCAwareRoundRobinPolicy from .models.storage.cassandra import PushTokens from .models.storage.cassandra import ChatAccount, ChatMessage, ChatMessageIdMapping, PublicKey if CassandraConfig.push_tokens_table: PushTokens.__table_name__ = CassandraConfig.push_tokens_table class FileTokenStorage(object): def __init__(self): self._tokens = defaultdict() @run_in_thread('file-io') def _save(self): with open(os.path.join(ServerConfig.spool_dir, 'webrtc_device_tokens'), 'wb+') as f: pickle.dump(self._tokens, f) @run_in_thread('file-io') def load(self): try: tokens = pickle.load(open(os.path.join(ServerConfig.spool_dir, 'webrtc_device_tokens'), 'rb')) except Exception: pass else: self._tokens.update(tokens) def __getitem__(self, key): try: return self._tokens[key] except KeyError: return {} def add(self, account, contact_params, user_agent): try: (token, background_token) = contact_params['pn_tok'].split('-') except ValueError: token = contact_params['pn_tok'] background_token = None data = { 'device_id': contact_params['pn_device'], 'platform': contact_params['pn_type'], 'silent': contact_params['pn_silent'], 'app_id': contact_params['pn_app'], 'user_agent': user_agent, 'background_token': background_token, 'device_token': token } key = f"{data['app_id']}-{data['device_id']}" if account in self._tokens: if isinstance(self._tokens[account], set): self._tokens[account] = {} # Remove old storage layout based on device id if contact_params['pn_device'] in self._tokens[account]: del self._tokens[account][contact_params['pn_device']] # Remove old storage layout based on token if token in self._tokens[account]: del self._tokens[account][token] # Remove old unsplit token if exists, can be removed if all tokens are stored split if background_token is not None: try: del self._tokens[account][contact_params['pn_tok']] except (IndexError, KeyError): pass self._tokens[account][key] = data else: self._tokens[account] = {key: data} self._save() def remove(self, account, app_id, device_id): key = f'{app_id}-{device_id}' try: del self._tokens[account][key] except KeyError: pass self._save() def removeAll(self, account): try: del self._tokens[account] except KeyError: pass self._save() class CassandraConnection(object, metaclass=Singleton): @run_in_thread('cassandra') def __init__(self): try: self.session = connection.setup(CassandraConfig.cluster_contact_points, CassandraConfig.keyspace, load_balancing_policy=DCAwareRoundRobinPolicy(), protocol_version=4) except NoHostAvailable: log.error("Not able to connect to any of the Cassandra contact points") raise StorageError class CassandraTokenStorage(object): @run_in_thread('cassandra') def load(self): CassandraConnection() def __getitem__(self, key): deferred = defer.Deferred() @run_in_thread('cassandra') def query_tokens(key): username, domain = key.split('@', 1) tokens = {} for device in PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain): tokens[f'{device.app_id}-{device.device_id}'] = {'device_id': device.device_id, 'token': device.device_token, 'platform': device.platform, 'silent': device.silent, 'app_id': device.app_id, 'background_token': device.background_token} deferred.callback(tokens) return tokens query_tokens(key) return deferred @run_in_thread('cassandra') def add(self, account, contact_params, user_agent): username, domain = account.split('@', 1) token = contact_params['pn_tok'] background_token = None if contact_params['pn_type'] == 'ios': try: (token, background_token) = contact_params['pn_tok'].split('-') except ValueError: pass # Remove old unsplit token if exists, can be removed if all tokens are stored split if background_token is not None: try: old_token = PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain)[0] except (IndexError, LWTException): pass else: if old_token.device_token == contact_params['pn_tok']: old_token.delete() try: PushTokens.create(username=username, domain=domain, device_id=contact_params['pn_device'], device_token=token, background_token=background_token, platform=contact_params['pn_type'], silent=contact_params['pn_silent'], app_id=contact_params['pn_app'], user_agent=user_agent) except (CQLEngineException, InvalidRequest) as e: log.error(f'Storing token failed: {e}') raise StorageError @run_in_thread('cassandra') def remove(self, account, app_id, device_id): username, domain = account.split('@', 1) try: PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain, PushTokens.device_id == device_id, PushTokens.app_id == app_id).if_exists().delete() except LWTException: pass class FileMessageStorage(object): def __init__(self): self._public_keys = defaultdict() self._accounts = defaultdict() self._storage_path = os.path.join(ServerConfig.spool_dir, 'conversations') def _json_dateconverter(self, o): if isinstance(o, datetime.datetime): return o.__str__() @run_in_thread('file-io') def _save(self): with open(os.path.join(self._storage_path, 'accounts.json'), 'w+') as f: json.dump(self._accounts, f, default=self._json_dateconverter) with open(os.path.join(self._storage_path, 'public_keys.json'), 'w+') as f: json.dump(self._public_keys, f, default=self._json_dateconverter) def _save_messages(self, account, messages): with open(os.path.join(self._storage_path, account[0], f'{account}_messages.json'), 'w+') as f: json.dump(messages, f, default=self._json_dateconverter, indent=4) def _save_id_by_timestamp(self, account, ids): with open(os.path.join(self._storage_path, account[0], f'{account}_id_timestamp.json'), 'w+') as f: json.dump(ids, f, default=self._json_dateconverter) @run_in_thread('file-io') def load(self): makedirs(self._storage_path) try: accounts = json.load(open(os.path.join(self._storage_path, 'accounts.json'), 'r')) except (OSError, IOError): pass else: self._accounts.update(accounts) try: public_keys = json.load(open(os.path.join(self._storage_path, 'public_keys.json'), 'r')) except (OSError, IOError): pass else: self._public_keys.update(public_keys) def _load_id_by_timestamp(self, account): try: with open(os.path.join(self._storage_path, account[0], f'{account}_id_timestamp.json'), 'r') as f: data = json.load(f) return data except (OSError, IOError) as e: raise e def _load_messages(self, account): try: with open(os.path.join(self._storage_path, account[0], f'{account}_messages.json'), 'r') as f: messages = json.load(f) return messages except (OSError, IOError) as e: raise e def __getitem__(self, key): deferred = defer.Deferred() @run_in_thread('file-io') - def query(account, message_id): + def query(account, message_id, since): messages = [] timestamp = None try: id_by_timestamp = self._load_id_by_timestamp(account) except (OSError, IOError): deferred.callback(messages) return if message_id is not None: try: timestamp = id_by_timestamp[message_id] except KeyError: deferred.callback(messages) return else: timestamp = ISOTimestamp(timestamp) + if since and not message_id: + timestamp = ISOTimestamp(since) + try: messages = self._load_messages(account) except (OSError, IOError): deferred.callback(messages) return else: if timestamp is not None: messages = [message for message in messages if ISOTimestamp(message['created_at']) > timestamp] deferred.callback(messages) else: deferred.callback(messages) - query(key[0], key[1]) + try: + since = key[2] + except IndexError: + since = None + query(key[0], key[1], since) return deferred def get_account(self, account): try: return SimpleNamespace(account=account, **self._accounts[account]) except KeyError: return None def get_account_token(self, account): try: if datetime.datetime.now() < datetime.datetime.fromisoformat(str(self._accounts[account]['token_expire'])): return self._accounts[account]['api_token'] return None except KeyError: return None def remove_account_token(self, account): if account in self._accounts: self._accounts[account]['api_token'] = '' self._save() def add_account(self, account): timestamp = datetime.datetime.now() if account not in self._accounts: self._accounts[account] = {'last_login': timestamp} self._save() else: self._accounts[account]['last_login'] = timestamp self._save() def get_public_key(self, account): try: return self._public_keys[account]['content'] except KeyError: return None def remove_account(self, account): if account in self._accounts: del self._accounts[account] self._save() def remove_public_key(self, account): if account in self._public_keys: del self.public_keys[account] self._save() def add_account_token(self, account, token): timestamp = datetime.datetime.now() if account not in self._accounts: log.error(f'Updating API token for {account} failed') return StorageError self._accounts[account]['api_token'] = token self._accounts[account]['token_expire'] = timestamp + datetime.timedelta(seconds=26784000) self._save() @run_in_thread('file-io') def mark_conversation_read(self, account, contact): try: messages = self._load_messages(account) except (OSError, IOError): return else: for idx, message in enumerate(messages): if message['contact'] == contact: message['disposition'] = [] messages[idx] = message self._save_messages(account, messages) @run_in_thread('file-io') def update(self, account, state, message_id): try: messages = self._load_messages(account) except (OSError, IOError): return try: id_by_timestamp = self._load_id_by_timestamp(account) except (OSError, IOError): return else: try: timestamp = id_by_timestamp[message_id] except KeyError: return else: for idx, message in enumerate(messages): if message['created_at'] == timestamp and message['message_id'] == message_id: if message['state'] != 'received': message['state'] = state if state == 'delivered': try: message['disposition'].remove('positive-delivery') except ValueError: pass elif state == 'displayed': message['disposition'] = [] messages[idx] = message self._save_messages(account, messages) break @run_in_thread('file-io') def add(self, account, contact, direction, content, content_type, timestamp, disposition_notification, message_id, state=None): try: msg_timestamp = datetime.datetime.fromisoformat(timestamp) except ValueError: msg_timestamp = datetime.datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%f%z") timestamp = datetime.datetime.now() messages = [] id_by_timestamp = {} try: messages = self._load_messages(account) except (OSError, IOError): makedirs(os.path.join(self._storage_path, account[0])) timestamp_not_found = True try: id_by_timestamp = self._load_id_by_timestamp(account) except (OSError, IOError): pass else: try: created_at = id_by_timestamp[message_id] except KeyError: pass else: timestamp_not_found = False if content_type == 'message/imdn+json': return items = [n for n in messages if n['created_at'] == created_at and n['message_id'] == message_id] if len(items) == 1: return if content_type == 'text/pgp-public-key': key = account if direction == "outgoing" else contact self._public_keys[key] = {'content': content, 'created_at': timestamp} self._save() return if content_type == 'text/pgp-private-key': if timestamp_not_found: id_by_timestamp[message_id] = timestamp self._save_id_by_timestamp(account, id_by_timestamp) return if not isinstance(disposition_notification, list) and disposition_notification == '': disposition_notification = [] message = {'account': account, 'direction': direction, 'contact': contact, 'content_type': content_type, 'content': content, 'created_at': timestamp, 'message_id': message_id, 'disposition': disposition_notification, 'state': state, 'msg_timestamp': msg_timestamp} messages.append(message) self._save_messages(account, messages) if timestamp_not_found: id_by_timestamp[message_id] = timestamp self._save_id_by_timestamp(account, id_by_timestamp) @run_in_thread('file-io') def removeChat(self, account, contact): try: messages = self._load_messages(account) except (OSError, IOError): pass else: messages[:] = [message for message in messages if message['contact'] != contact] self._save_messages(account, messages) @run_in_thread('file-io') def removeMessage(self, account, message_id): try: id_by_timestamp = self._load_id_by_timestamp(account) except (OSError, IOError): return else: try: timestamp = id_by_timestamp[message_id] except KeyError: return else: try: messages = self._load_messages(account) except (OSError, IOError): return else: item = [n for n in messages if n['created_at'] == timestamp and n['message_id'] == message_id] if len(item) == 1: if item[0]['content_type'] == 'application/sylk-file-transfer': data = FileTransferData('', '', '', item[0]['message_id'], item[0]['account'], item[0]['contact']) try: rmtree(data.path) except FileNotFoundError: pass else: print(f"Removed {data.path}") messages.remove(item[0]) self._save_messages(account, messages) class CassandraMessageStorage(object): @run_in_thread('cassandra') def load(self): CassandraConnection() def __getitem__(self, key): deferred = defer.Deferred() @run_in_thread('cassandra') - def query_messages(key, message_id): + def query_messages(key, message_id, since): messages = [] try: timestamp = ChatMessageIdMapping.objects(ChatMessageIdMapping.message_id == message_id)[0] except (IndexError, InvalidRequest): timestamp = datetime.datetime.now() - datetime.timedelta(days=3) else: timestamp = timestamp.created_at + + if since and not message_id: + timestamp = ISOTimestamp(since) + for message in ChatMessage.objects(ChatMessage.account == key, ChatMessage.created_at > timestamp): timestamp_naive = message.msg_timestamp try: timestamp_utc = timestamp_naive.replace(tzinfo=datetime.timezone.utc) except AttributeError: continue message.msg_timestamp = timestamp_utc messages.append(message) deferred.callback(messages) - - query_messages(key[0], key[1]) + try: + since = key[2] + except IndexError: + since = None + query_messages(key[0], key[1], since) return deferred def get_account(self, account): deferred = defer.Deferred() @run_in_thread('cassandra') def query_tokens(account): try: chat_account = ChatAccount.objects(ChatAccount.account == account)[0] except (IndexError, InvalidRequest): deferred.callback(None) else: deferred.callback(chat_account) query_tokens(account) return deferred def get_account_token(self, account): deferred = defer.Deferred() @run_in_thread('cassandra') def query_tokens(account): try: chat_account = ChatAccount.objects(ChatAccount.account == account)[0] except (IndexError, InvalidRequest): deferred.callback(None) else: deferred.callback(chat_account.api_token) query_tokens(account) return deferred @run_in_thread('cassandra') def add_account(self, account): timestamp = datetime.datetime.now() try: ChatAccount.create(account=account, last_login=timestamp) except (CQLEngineException, InvalidRequest) as e: log.error(f'Storing account failed: {e}') raise StorageError @run_in_thread('cassandra') def add_account_token(self, account, token): try: chat_account = ChatAccount.objects(account=account)[0] chat_account.ttl(2678400).update(api_token=token) except IndexError: log.error(f'Updating API token for {account} failed') raise StorageError @run_in_thread('cassandra') def mark_conversation_read(self, account, contact): for message in ChatMessage.objects(ChatMessage.account == account): if message.contact == contact: if message.content_type == 'application/sylk-conversation-read': message.delete() else: message.disposition.clear() message.save() def get_public_key(self, account): deferred = defer.Deferred() @run_in_thread('cassandra') def query_key(account): try: public_key = PublicKey.objects(PublicKey.account == account)[0] except (IndexError, InvalidRequest): deferred.callback(None) else: deferred.callback(public_key.public_key) query_key(account) return deferred @run_in_thread('cassandra') def update(self, account, state, message_id): try: timestamp = ChatMessageIdMapping.objects(ChatMessageIdMapping.message_id == message_id)[0] except IndexError: return else: try: message = ChatMessage.objects(ChatMessage.account == account, ChatMessage.created_at == timestamp.created_at, ChatMessage.message_id == message_id)[0] except IndexError: pass else: if message.state != 'received': message.state = state if state == 'delivered': try: message.disposition.remove('positive-delivery') except ValueError: pass elif state == 'displayed': message.disposition.clear() message.save() @run_in_thread('cassandra') def add(self, account, contact, direction, content, content_type, timestamp, disposition_notification, message_id, state=None): try: msg_timestamp = datetime.datetime.fromisoformat(timestamp) except ValueError: msg_timestamp = datetime.datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%f%z") timestamp = datetime.datetime.now() timestamp_not_found = True try: created_at = ChatMessageIdMapping.objects(ChatMessageIdMapping.message_id == message_id)[0] except IndexError: pass else: timestamp_not_found = False timestamp = created_at.created_at if content_type == 'message/imdn+json': return if ChatMessage.objects(ChatMessage.account == account, ChatMessage.created_at == created_at.created_at, ChatMessage.message_id == message_id).count() != 0: return if content_type == 'text/pgp-public-key': try: PublicKey.create(account=account if direction == "outgoing" else contact, public_key=content, created_at=timestamp) ChatMessageIdMapping.create(created_at=timestamp, message_id=message_id) except (CQLEngineException, InvalidRequest) as e: log.error(f'Storing public key failed: {e}') raise StorageError else: return if content_type == 'text/pgp-private-key': if timestamp_not_found: try: ChatMessageIdMapping.create(created_at=timestamp, message_id=message_id) except (CQLEngineException, InvalidRequest): pass return try: ChatMessage.create(account=account, direction=direction, contact=contact, content_type=content_type, content=content, created_at=timestamp, message_id=message_id, disposition=disposition_notification, state=state, msg_timestamp=msg_timestamp) if timestamp_not_found: ChatMessageIdMapping.create(created_at=timestamp, message_id=message_id) except (CQLEngineException, InvalidRequest) as e: log.error(f'Storing message failed: {e}') raise StorageError @run_in_thread('cassandra') def removeChat(self, account, contact): try: messages = ChatMessage.objects(ChatMessage.account == account) except LWTException: pass else: for message in messages: if message.contact == contact: message.delete() @run_in_thread('cassandra') def removeMessage(self, account, message_id): try: timestamp = ChatMessageIdMapping.objects(ChatMessageIdMapping.message_id == message_id)[0] except IndexError: return else: try: messages = ChatMessage.objects(ChatMessage.account == account, ChatMessage.created_at == timestamp.created_at, # ChatMessage.message_id == message_id).if_exists().delete() ChatMessage.message_id == message_id) for message in messages: if message.content_type == 'application/sylk-file-transfer': data = FileTransferData('', '', '', message.message_id, message.account, message.contact) try: rmtree(data.path) except FileNotFoundError: pass else: print(f"Removed {data.path}") messages.if_exists().delete() except LWTException: pass class TokenStorage(object, metaclass=Singleton): def __new__(self): if CASSANDRA_MODULES_AVAILABLE and CassandraConfig.cluster_contact_points: return CassandraTokenStorage() else: return FileTokenStorage() class MessageStorage(object, metaclass=Singleton): def __new__(self): if CASSANDRA_MODULES_AVAILABLE and CassandraConfig.cluster_contact_points: return CassandraMessageStorage() else: return FileMessageStorage()