diff --git a/sylk/applications/webrtcgateway/__init__.py b/sylk/applications/webrtcgateway/__init__.py index b2fdee2..973de87 100644 --- a/sylk/applications/webrtcgateway/__init__.py +++ b/sylk/applications/webrtcgateway/__init__.py @@ -1,470 +1,676 @@ +import base64 import json +import hashlib import random +import os import secrets import uuid +import urllib.parse from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null +from application.system import unlink from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SIPURI, FromHeader, ToHeader, Message, RouteHeader, Header, Route, Request from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.payloads.imdn import IMDNDocument from sipsimple.streams.msrp.chat import CPIMPayload, CPIMParserError, Message as SIPMessage +from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import run_in_green_thread from sipsimple.util import ISOTimestamp -from twisted.internet import defer +from twisted.internet import defer, reactor from zope.interface import implementer from sylk.applications import SylkApplication from sylk.configuration import SIPConfig +from sylk.session import IllegalStateError from sylk.web import server from .configuration import GeneralConfig from .logger import log from .models import sylkrtc from .storage import TokenStorage, MessageStorage from .web import WebHandler, AdminWebHandler from . import push @implementer(IObserver) class WebRTCGatewayApplication(SylkApplication): def __init__(self): self.web_handler = WebHandler() self.admin_web_handler = AdminWebHandler() def start(self): self.web_handler.start() self.admin_web_handler.start() # Load tokens from the storage token_storage = TokenStorage() token_storage.load() # Setup message storage message_storage = MessageStorage() message_storage.load() def stop(self): self.web_handler.stop() self.admin_web_handler.stop() def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def incoming_session(self, session): - log.debug('New incoming session {session.call_id} from sip:{uri.user}@{uri.host} rejected'.format(session=session, uri=session.remote_identity.uri)) - session.reject(403) + log.info('New incoming session {session.call_id} from sip:{uri.user}@{uri.host}'.format(session=session, uri=session.remote_identity.uri)) + transfer_streams = [stream for stream in session.proposed_streams if stream.type == 'file-transfer'] + + if not transfer_streams: + log.info(u'Session rejected: invalid media') + session.reject(488) + return + + transfer_stream = transfer_streams[0] + + if transfer_stream.direction == 'sendonly': + # file transfer 'pull' + log.info('Session rejected: requested file not found') + session.reject(404) + return + + sender = f'{session.remote_identity.uri.user}@{session.remote_identity.uri.host}' + receiver = f'{session.local_identity.uri.user}@{session.local_identity.uri.host}' + + message_storage = MessageStorage() + account = defer.maybeDeferred(message_storage.get_account, receiver) + account.addCallback(lambda result: self._check_receiver_session(result, session, transfer_stream)) + + sender_account = defer.maybeDeferred(message_storage.get_account, sender) + sender_account.addCallback(lambda result: self._check_sender_session(result, session, transfer_stream)) + + d1 = defer.DeferredList([account, sender_account], consumeErrors=True) + d1.addCallback(lambda result: self._handle_lookup_result(result, session, [transfer_stream])) + + NotificationCenter().add_observer(self, sender=session) + + def _encode_and_hash_uri(self, uri): + return base64.b64encode(hashlib.md5(uri.encode('utf-8')).digest()).rstrip(b'=\n').decode('utf-8') + + def _set_save_directory(self, session, transfer_stream, sender=None, receiver=None): + transfer_id = str(uuid.uuid4()) + settings = SIPSimpleSettings() + if receiver is None: + receiver = f'{session.local_identity.uri.user}@{session.local_identity.uri.host}' + hashed_receiver = self._encode_and_hash_uri(receiver) + hashed_sender = self._encode_and_hash_uri(sender) + prefix = hashed_receiver[:1] + folder = os.path.join(settings.file_transfer.directory.normalized, prefix, hashed_receiver, hashed_sender, transfer_id) + else: + sender = f'{session.remote_identity.uri.user}@{session.remote_identity.uri.host}' + hashed_receiver = self._encode_and_hash_uri(receiver) + hashed_sender = self._encode_and_hash_uri(sender) + prefix = hashed_sender[:1] + folder = os.path.join(settings.file_transfer.directory.normalized, prefix, hashed_sender, hashed_receiver, transfer_id) + + metadata = sylkrtc.TransferredFile(transfer_id=transfer_id, + sender=sylkrtc.SIPIdentity(uri=sender, display_name=session.remote_identity.display_name), + receiver=sylkrtc.SIPIdentity(uri=receiver, display_name=session.local_identity.display_name), + filename=transfer_stream.file_selector.name, + prefix=prefix, + path=folder, + filesize=transfer_stream.file_selector.size, + timestamp=str(ISOTimestamp.now())) + + transfer_stream.handler.save_directory = folder + session.metadata = metadata + log.info('File transfer from %s to %s will be saved to %s/%s' % (metadata.sender.uri, metadata.receiver.uri, metadata.path, metadata.filename)) + + def _check_receiver_session(self, account, session, transfer_stream): + if account is None: + raise Exception("Receiver account for filetransfer not found") + + self._set_save_directory(session, transfer_stream, receiver=account.account) + + def _check_sender_session(self, account, session, transfer_stream): + if account is not None: + return + + self._set_save_directory(session, transfer_stream, sender=account.account) + + def _handle_lookup_result(self, result, session, streams): + for (success, value) in result: + if not success: + self._reject_session(session, value.getErrorMessage()) + return + self._accept_session(session, streams) + + def _reject_session(self, session, error): + log.warning(f'File transfer rejected: {error}') + session.reject(404) + + def _accept_session(self, session, streams): + try: + session.accept(streams) + except IllegalStateError: + session.reject(500) def incoming_subscription(self, request, data): request.reject(405) def incoming_referral(self, request, data): request.reject(405) def incoming_message(self, message_request, data): content_type = data.headers.get('Content-Type', Null).content_type from_header = data.headers.get('From', Null) to_header = data.headers.get('To', Null) if Null in (content_type, from_header, to_header): message_request.answer(400) return if content_type == 'message/cpim': try: cpim_message = CPIMPayload.decode(data.body) except CPIMParserError: log.warning('SIP message from %s to %s rejected: CPIM parse error' % (from_header.uri, '%s@%s' % (to_header.uri.user, to_header.uri.host))) message_request.answer(400) return else: content_type = cpim_message.content_type log.info('received SIP message (%s) from %s to %s' % (content_type, from_header.uri, '%s@%s' % (to_header.uri.user, to_header.uri.host))) message_request.answer(200) message_handler = MessageHandler() message_handler.incoming_message(data) + def _NH_SIPSessionDidStart(self, notification): + session = notification.sender + try: + transfer_stream = next(stream for stream in session.streams if stream.type == 'file-transfer') + except StopIteration: + pass + else: + transfer_handler = FileTransferHandler() + transfer_handler.init_incoming(transfer_stream) + + def _NH_SIPSessionDidEnd(self, notification): + session = notification.sender + notification.center.remove_observer(self, sender=session) + + def _NH_SIPSessionDidFail(self, notification): + session = notification.sender + notification.center.remove_observer(self, sender=session) + log.info('File transfer from %s to %s failed: %s (%s)' % (session.remote_identity.uri, session.local_identity.uri, notification.data.reason, notification.data.failure_reason)) + class ParsedSIPMessage(SIPMessage): __slots__ = 'message_id', 'disposition', 'destination' def __init__(self, content, content_type, sender=None, recipients=None, courtesy_recipients=None, subject=None, timestamp=None, required=None, additional_headers=None, message_id=None, disposition=None, destination=None): super(ParsedSIPMessage, self).__init__(content, content_type, sender, recipients, courtesy_recipients, subject, timestamp, required, additional_headers) self.message_id = message_id self.disposition = disposition self.destination = destination @implementer(IObserver) class ReplicatedMessage(Message): def __init__(self, from_header, to_header, route_header, content_type, body, credentials=None, extra_headers=None): super(ReplicatedMessage, self).__init__(from_header, to_header, route_header, content_type, body, credentials=None, extra_headers=None) self._request = Request("MESSAGE", from_header.uri, from_header, to_header, route_header, credentials=credentials, extra_headers=extra_headers, content_type=content_type, body=body if isinstance(body, bytes) else body.encode()) @implementer(IObserver) class MessageHandler(object): def __init__(self): self.message_storage = MessageStorage() self.resolver = DNSLookup() self.from_header = None self.to_header = None self.content_type = None self.from_sip = None self.body = None self.parsed_message = None def _lookup_sip_target_route(self, uri): proxy = GeneralConfig.outbound_sip_proxy if proxy is not None: sip_uri = SIPURI(host=proxy.host, port=proxy.port, parameters={'transport': proxy.transport}) else: sip_uri = SIPURI.parse('sip:%s' % uri) settings = SIPSimpleSettings() try: routes = self.resolver.lookup_sip_proxy(sip_uri, settings.sip.transport_list).wait() except DNSLookupError as e: raise DNSLookupError('DNS lookup error: {exception!s}'.format(exception=e)) if not routes: raise DNSLookupError('DNS lookup error: no results found') route = random.choice([r for r in routes if r.transport == routes[0].transport]) log.debug('DNS lookup for SIP message proxy for {} yielded {}'.format(uri, route)) return route def _parse_message(self): cpim_message = None if self.content_type == "message/cpim": cpim_message = CPIMPayload.decode(self.body) body = cpim_message.content if isinstance(cpim_message.content, str) else cpim_message.content.decode() content_type = cpim_message.content_type sender = cpim_message.sender or self.from_header disposition = next(([item.strip() for item in header.value.split(',')] for header in cpim_message.additional_headers if header.name == 'Disposition-Notification'), None) message_id = next((header.value for header in cpim_message.additional_headers if header.name == 'Message-ID'), str(uuid.uuid4())) else: body = self.body.decode('utf-8') sender = self.from_header disposition = None message_id = str(uuid.uuid4()) content_type = str(self.content_type) timestamp = str(cpim_message.timestamp) if cpim_message is not None and cpim_message.timestamp is not None else str(ISOTimestamp.now()) sender = sylkrtc.SIPIdentity(uri=str(sender.uri), display_name=sender.display_name) destination = sylkrtc.SIPIdentity(uri=str(self.to_header.uri), display_name=self.to_header.display_name) self.parsed_message = ParsedSIPMessage(body, content_type, sender=sender, disposition=disposition, message_id=message_id, timestamp=timestamp, destination=destination) def _send_public_key(self, from_header, to_header, public_key): if public_key: self.outgoing_message(from_header, public_key, 'text/pgp-public-key', str(to_header)) def _handle_generate_token(self): account = f'{self.from_header.uri.user}@{self.from_header.uri.host}' log.info(f'Adding {account} for storing messages') self.message_storage.add_account(account) token = secrets.token_urlsafe() self.outgoing_message(self.from_header.uri, json.dumps({'token': token, 'url': f'{server.url}/webrtcgateway/messages/history/{account}'}), 'application/sylk-api-token') self.message_storage.add_account_token(account=account, token=token) def _handle_lookup_pgp_key(self): account = f'{self.to_header.uri.user}@{self.to_header.uri.host}' public_key = self.message_storage.get_public_key(account) log.info(f'Public key lookup for {account}') if isinstance(public_key, defer.Deferred): public_key.addCallback(lambda result: self._send_public_key(self.from_header.uri, self.to_header.uri, result)) else: self._send_public_key(self.from_header.uri, self.to_header.uri, public_key) def _handle_message_remove(self): account = f'{self.from_header.uri.user}@{self.from_header.uri.host}' contact = f'{self.to_header.uri.user}@{self.to_header.uri.host}' message_id = self.parsed_message.content self.message_storage.removeMessage(account=account, message_id=message_id) content = sylkrtc.AccountMessageRemoveEventData(contact=contact, message_id=message_id) self.message_storage.add(account=account, contact=contact, direction='', content=json.dumps(content.__data__), content_type='application/sylk-message-remove', timestamp=str(ISOTimestamp.now()), disposition_notification='', message_id=str(uuid.uuid4())) event = sylkrtc.AccountSyncEvent(account=account, type='message', action='remove', content=content) self.outgoing_message(self.from_header.uri, json.dumps(content.__data__), 'application/sylk-message-remove', str(self.to_header.uri)) notification_center = NotificationCenter() notification_center.post_notification(name='SIPApplicationGotAccountRemoveMessage', sender=account, data=event) def remove_message_from_receiver(msg_id, messages): for message in messages: if message.message_id == msg_id and message.direction == 'incoming': account = message.account message_id = message.message_id self.message_storage.removeMessage(account=account, message_id=message_id) content = sylkrtc.AccountMessageRemoveEventData(contact=message.contact, message_id=message_id) self.message_storage.add(account=account, contact=message.contact, direction='', content=json.dumps(content.__data__), content_type='application/sylk-message-remove', timestamp=str(ISOTimestamp.now()), disposition_notification='', message_id=str(uuid.uuid4())) event = sylkrtc.AccountSyncEvent(account=account, type='message', action='remove', content=content) notification_center.post_notification(name='SIPApplicationGotAccountRemoveMessage', sender=account, data=event) log.info("Removed receiver message") break messages = self.message_storage[[contact, '']] if isinstance(messages, defer.Deferred): messages.addCallback(lambda result: remove_message_from_receiver(msg_id=message_id, messages=result)) else: remove_message_from_receiver(msg_id=message_id, messages=messages) def _store_message_for_sender(self, account): if account is None: log.info('not storing %s message from non-existent account %s to %s' % (self.parsed_message.content_type, self.from_header.uri, '%s@%s' % (self.to_header.uri.user, self.to_header.uri.host))) return log.debug(f"storage is enabled for originator {account.account}") message = None ignored_content_types = ("application/im-iscomposing+xml", 'text/pgp-public-key', IMDNDocument.content_type) if self.parsed_message.content_type in ignored_content_types: return log.info('storing {content_type} message for account {originator} to {destination.uri}'.format(content_type=self.parsed_message.content_type, originator=account.account, destination=self.parsed_message.destination)) self.message_storage.add(account=account.account, contact=f'{self.to_header.uri.user}@{self.to_header.uri.host}', direction="outgoing", content=self.parsed_message.content, content_type=self.parsed_message.content_type, timestamp=str(self.parsed_message.timestamp), disposition_notification=self.parsed_message.disposition, message_id=self.parsed_message.message_id, state='accepted') message = sylkrtc.AccountSyncEvent(account=account.account, type='message', action='add', content=sylkrtc.AccountMessageRequest( transaction='1', account=account.account, uri=f'{self.to_header.uri.user}@{self.to_header.uri.host}', message_id=self.parsed_message.message_id, content=self.parsed_message.content, content_type=self.parsed_message.content_type, timestamp=str(self.parsed_message.timestamp) )) notification_center = NotificationCenter() notification_center.post_notification(name='SIPApplicationGotOutgoingAccountMessage', sender=account.account, data=message) def _store_message_for_receiver(self, account): if account is None: log.info('not storing %s message from %s to non-existent account %s' % (self.parsed_message.content_type, self.from_header.uri, '%s@%s' % (self.to_header.uri.user, self.to_header.uri.host))) return log.debug(f'processing message from {self.from_header.uri} for account {account.account}') message = None notification_center = NotificationCenter() if self.parsed_message.content_type == "application/im-iscomposing+xml": return if self.parsed_message.content_type == IMDNDocument.content_type: document = IMDNDocument.parse(self.parsed_message.content) imdn_message_id = document.message_id.value imdn_status = document.notification.status.__str__() imdn_datetime = document.datetime.__str__() log.info('storing IMDN message ({status}) from {originator.uri}'.format(status=imdn_status, originator=self.parsed_message.sender)) self.message_storage.update(account=account.account, state=imdn_status, message_id=imdn_message_id) self.message_storage.update(account=str(self.parsed_message.sender.uri), state=imdn_status, message_id=imdn_message_id) message = sylkrtc.AccountDispositionNotificationEvent(account=account.account, state=imdn_status, message_id=imdn_message_id, message_timestamp=imdn_datetime, timestamp=str(self.parsed_message.timestamp), code=200, reason='') imdn_message_event = message.__data__ # del imdn_message_event['account'] ## Maybe prevent multiple imdn rows? self.message_storage.add(account=account.account, contact=self.parsed_message.sender.uri, direction="incoming", content=json.dumps(imdn_message_event), content_type='message/imdn', timestamp=str(self.parsed_message.timestamp), disposition_notification='', message_id=self.parsed_message.message_id, state='received') notification_center.post_notification(name='SIPApplicationGotAccountDispositionNotification', sender=account.account, data=NotificationData(message=message, sender=self.parsed_message.sender)) else: log.info('storing {content_type} message from {originator.uri} for account {account}'.format(content_type=self.parsed_message.content_type, originator=self.parsed_message.sender, account=account.account)) self.message_storage.add(account=account.account, contact=str(self.parsed_message.sender.uri), direction='incoming', content=self.parsed_message.content, content_type=self.parsed_message.content_type, timestamp=str(self.parsed_message.timestamp), disposition_notification=self.parsed_message.disposition, message_id=self.parsed_message.message_id, state='received') message = sylkrtc.AccountMessageEvent(account=account.account, sender=self.parsed_message.sender, content=self.parsed_message.content, content_type=self.parsed_message.content_type, timestamp=str(self.parsed_message.timestamp), disposition_notification=self.parsed_message.disposition, message_id=self.parsed_message.message_id) notification_center.post_notification(name='SIPApplicationGotAccountMessage', sender=account.account, data=message) if self.parsed_message.content_type == 'text/plain' or self.parsed_message.content_type == 'text/html': def get_unread_messages(messages, originator): unread = 1 for message in messages: if ((message.content_type == 'text/plain' or message.content_type == 'text/html') and message.direction == 'incoming' and message.contact != account.account and 'display' in message.disposition): unread += 1 # log.info(f'{message.disposition} {message.contact} {message.state}') # log.info(f'there are {unread} messages') push.message(originator=originator, destination=account.account, call_id=str(uuid.uuid4()), badge=unread) messages = self.message_storage[[account.account, '']] if isinstance(messages, defer.Deferred): messages.addCallback(lambda result: get_unread_messages(messages=result, originator=message.sender)) else: get_unread_messages(messages=messages, originator=message.sender) def incoming_message(self, data, content_type=None): self.content_type = content_type if content_type is not None else data.headers.get('Content-Type', Null).content_type self.from_header = data.headers.get('From', Null) self.to_header = data.headers.get('To', Null) self.body = data.body self.from_sip = data.headers.get('X-Sylk-From-Sip', Null) self._parse_message() if self.parsed_message.content_type == 'application/sylk-api-token': self._handle_generate_token() return if self.parsed_message.content_type == 'application/sylk-api-pgp-key-lookup': self._handle_lookup_pgp_key() return if self.parsed_message.content_type == 'application/sylk-api-message-remove': self._handle_message_remove() return if self.from_sip is not Null: log.debug("message is originating from SIP endpoint") sender_account = self.message_storage.get_account(f'{self.from_header.uri.user}@{self.from_header.uri.host}') if isinstance(sender_account, defer.Deferred): sender_account.addCallback(lambda result: self._store_message_for_sender(result)) else: self._store_message_for_sender(sender_account) account = self.message_storage.get_account(f'{self.to_header.uri.user}@{self.to_header.uri.host}') if isinstance(account, defer.Deferred): account.addCallback(lambda result: self._store_message_for_receiver(result)) else: self._store_message_for_receiver(account) @run_in_green_thread def _outgoing_message(self, to_uri, from_uri, content, content_type='text/plain', headers=[], route=None, message_type=Message, subscribe=True): if not route: return from_uri = SIPURI.parse('%s' % from_uri) to_uri = SIPURI.parse('%s' % to_uri) content = content if isinstance(content, bytes) else content.encode() message_request = message_type(FromHeader(from_uri), ToHeader(to_uri), RouteHeader(route.uri), content_type, content, extra_headers=headers) if subscribe: notification_center = NotificationCenter() notification_center.add_observer(self, sender=message_request) message_request.send() @run_in_green_thread def outgoing_message(self, uri, content, content_type='text/plain', identity=None, extra_headers=[]): route = self._lookup_sip_target_route(uri) if route: if identity is None: identity = f'sip:sylkserver@{SIPConfig.local_ip}' log.info("sending message from '%s' to '%s' using proxy %s" % (identity, uri, route)) headers = [Header('X-Sylk-To-Sip', 'yes')] + extra_headers self._outgoing_message(uri, identity, content, content_type, headers=headers, route=route) @run_in_green_thread def outgoing_message_to_self(self, uri, content, content_type='text/plain', identity=None, extra_headers=[]): route = Route(address=SIPConfig.local_ip, port=SIPConfig.local_tcp_port, transport='tcp') if route: if identity is None: identity = f'sip:sylkserver@{SIPConfig.local_ip}' log.debug("sending message from '%s' to '%s' to self %s" % (identity, uri, route)) headers = [Header('X-Sylk-From-Sip', 'yes')] + extra_headers self._outgoing_message(uri, identity, content, content_type, headers=headers, route=route, subscribe=False) @run_in_green_thread def outgoing_replicated_message(self, uri, content, content_type='text/plain', identity=None, extra_headers=[]): route = self._lookup_sip_target_route(identity) if route: if identity is None: identity = f'sip:sylkserver@{SIPConfig.local_ip}' log.info("sending message from '%s' to '%s' using proxy %s" % (identity, uri, route)) headers = [Header('X-Sylk-To-Sip', 'yes'), Header('X-Replicated-Message', 'yes')] + extra_headers self._outgoing_message(uri, identity, content, content_type, headers=headers, route=route, message_type=ReplicatedMessage) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPMessageDidSucceed(self, notification): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=notification.sender) log.info('outgoing message was accepted by remote party') def _NH_SIPMessageDidFail(self, notification): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=notification.sender) data = notification.data reason = data.reason.decode() if isinstance(data.reason, bytes) else data.reason log.warning('could not deliver outgoing message %d %s' % (data.code, reason)) + +@implementer(IObserver) +class FileTransferHandler(object): + def __init__(self): + self.session = None + self.stream = None + self.handler = None + self.direction = None + + def init_incoming(self, stream): + self.direction = 'incoming' + self.stream = stream + self.session = stream.session + self.handler = stream.handler + notification_center = NotificationCenter() + notification_center.add_observer(self, sender=self.stream) + notification_center.add_observer(self, sender=self.handler) + + @run_in_green_thread + def init_outgoing(self, destination, file): + self.direction = 'outgoing' + + def _terminate(self, failure_reason=None): + notification_center = NotificationCenter() + notification_center.remove_observer(self, sender=self.stream) + notification_center.remove_observer(self, sender=self.handler) + + if failure_reason is None: + if self.direction == 'incoming' and self.stream.direction == 'recvonly': + if not hasattr(self.session, 'metadata'): + return + + metadata = self.session.metadata + filepath = os.path.join(metadata.path, metadata.filename) + meta_filepath = os.path.join(metadata.path, f'meta-{metadata.filename}') + + try: + with open(meta_filepath, 'w+') as output_file: + output_file.write(json.dumps(metadata.__data__)) + except (OSError, IOError): + unlink(meta_filepath) + log.warning('Could not save metadata %s' % meta_filepath) + + log.info('File transfer finished, saved to %s' % filepath) + settings = SIPSimpleSettings() + stripped_path = os.path.relpath(metadata.path, f'{settings.file_transfer.directory.normalized}/{metadata.prefix}') + file_path = urllib.parse.quote(f'webrtcgateway/filetransfer/{stripped_path}/{metadata.filename}') + file_url = f'{server.url}/{file_path}' + + payload = 'File transfer available at %s (%s)' % (file_url, self.format_file_size(metadata.filesize)) + message_handler = MessageHandler() + message_handler.outgoing_replicated_message(f'sip:{metadata.receiver.uri}', payload, identity=f'sip:{metadata.sender.uri}') + message_handler.outgoing_message(f'sip:{metadata.receiver.uri}', payload, identity=f'sip:{metadata.sender.uri}') + message_handler.outgoing_message_to_self(f'sip:{metadata.receiver.uri}', payload, identity=f'sip:{metadata.sender.uri}') + else: + pass + + self.session = None + self.stream = None + self.handler = None + + @staticmethod + def format_file_size(size): + infinite = float('infinity') + boundaries = [( 1024, '%d bytes', 1), + ( 10*1024, '%.2f KB', 1024.0), ( 1024*1024, '%.1f KB', 1024.0), + ( 10*1024*1024, '%.2f MB', 1024*1024.0), (1024*1024*1024, '%.1f MB', 1024*1024.0), + (10*1024*1024*1024, '%.2f GB', 1024*1024*1024.0), ( infinite, '%.1f GB', 1024*1024*1024.0)] + for boundary, format, divisor in boundaries: + if size < boundary: + return format % (size/divisor,) + else: + return "%d bytes" % size + + @run_in_twisted_thread + def handle_notification(self, notification): + handler = getattr(self, '_NH_%s' % notification.name, Null) + handler(notification) + + def _NH_MediaStreamDidNotInitialize(self, notification): + self._terminate(failure_reason=notification.data.reason) + + def _NH_FileTransferHandlerDidEnd(self, notification): + if self.direction == 'incoming': + if self.stream.direction == 'sendonly': + reactor.callLater(3, self.session.end) + else: + reactor.callLater(1, self.session.end) + else: + self.session.end() + self._terminate(failure_reason=notification.data.reason) diff --git a/sylk/applications/webrtcgateway/models/sylkrtc.py b/sylk/applications/webrtcgateway/models/sylkrtc.py index 45d2bb6..4ffd697 100644 --- a/sylk/applications/webrtcgateway/models/sylkrtc.py +++ b/sylk/applications/webrtcgateway/models/sylkrtc.py @@ -1,621 +1,632 @@ from application.python import subclasses from .jsonobjects import BooleanProperty, IntegerProperty, StringProperty, ArrayProperty, ObjectProperty, FixedValueProperty, LimitedChoiceProperty, AbstractObjectProperty, AbstractProperty from .jsonobjects import JSONObject, JSONArray, StringArray, CompositeValidator from .validators import AORValidator, DisplayNameValidator, LengthValidator, UniqueItemsValidator from sipsimple.util import ISOTimestamp # Base models (these are abstract and should not be used directly) class SylkRTCRequestBase(JSONObject): transaction = StringProperty() class SylkRTCResponseBase(JSONObject): transaction = StringProperty() class AccountRequestBase(SylkRTCRequestBase): account = StringProperty(validator=AORValidator()) class SessionRequestBase(SylkRTCRequestBase): session = StringProperty() class VideoroomRequestBase(SylkRTCRequestBase): session = StringProperty() class AccountEventBase(JSONObject): sylkrtc = FixedValueProperty('account-event') account = StringProperty(validator=AORValidator()) class SessionEventBase(JSONObject): sylkrtc = FixedValueProperty('session-event') session = StringProperty() class VideoroomEventBase(JSONObject): sylkrtc = FixedValueProperty('videoroom-event') session = StringProperty() class AccountRegistrationStateEvent(AccountEventBase): event = FixedValueProperty('registration-state') class SessionStateEvent(SessionEventBase): event = FixedValueProperty('state') class VideoroomSessionStateEvent(VideoroomEventBase): event = FixedValueProperty('session-state') # Miscellaneous models class Header(JSONObject): name = StringProperty() value = StringProperty() class Headers(JSONArray): item_type = Header class SIPIdentity(JSONObject): uri = StringProperty(validator=AORValidator()) display_name = StringProperty(optional=True, validator=DisplayNameValidator()) class ICECandidate(JSONObject): candidate = StringProperty() sdpMLineIndex = IntegerProperty() sdpMid = StringProperty() class ICECandidates(JSONArray): item_type = ICECandidate class AORList(StringArray): list_validator = UniqueItemsValidator() item_validator = AORValidator() class VideoroomPublisher(JSONObject): id = StringProperty() uri = StringProperty(validator=AORValidator()) display_name = StringProperty(optional=True) class VideoroomPublishers(JSONArray): item_type = VideoroomPublisher class VideoroomActiveParticipants(StringArray): list_validator = CompositeValidator(UniqueItemsValidator(), LengthValidator(maximum=2)) class VideoroomSessionOptions(JSONObject): audio = BooleanProperty(optional=True) video = BooleanProperty(optional=True) bitrate = IntegerProperty(optional=True) class VideoroomRaisedHands(StringArray): list_validator = UniqueItemsValidator() class SharedFile(JSONObject): filename = StringProperty() filesize = IntegerProperty() uploader = ObjectProperty(SIPIdentity) # type: SIPIdentity session = StringProperty() class SharedFiles(JSONArray): item_type = SharedFile +class TransferredFile(JSONObject): + filename = StringProperty() + filesize = IntegerProperty() + sender = ObjectProperty(SIPIdentity) # type: SIPIdentity + receiver = ObjectProperty(SIPIdentity) + transfer_id = StringProperty() + prefix = StringProperty() + path = StringProperty() + timestamp = StringProperty() + + class DispositionNotifications(StringArray): list_validator = UniqueItemsValidator() class Message(JSONObject): contact = StringProperty(validator=AORValidator()) timestamp = StringProperty() disposition = ArrayProperty(DispositionNotifications, optional=True) message_id = StringProperty() content_type = StringProperty() content = StringProperty() direction = StringProperty(optional=True) state = LimitedChoiceProperty(['delivered', 'failed', 'displayed', 'forbidden', 'error', 'accepted', 'pending', 'received'], optional=True) def __init__(self, **kw): if 'msg_timestamp' in kw: kw['timestamp'] = str(ISOTimestamp(kw['msg_timestamp'])) del kw['msg_timestamp'] super(Message, self).__init__(**kw) class ContactMessages(JSONArray): item_type = Message class MessageHistoryData(JSONObject): account = StringProperty(validator=AORValidator()) messages = ArrayProperty(ContactMessages) class AccountMessageRemoveEventData(JSONObject): contact = StringProperty() message_id = StringProperty() class AccountMarkConversationReadEventData(JSONObject): contact = StringProperty() class AccountConversationRemoveEventData(JSONObject): contact = StringProperty() class AccountDispositionNotificationEventData(JSONObject): message_id = StringProperty() state = LimitedChoiceProperty(['accepted', 'delivered', 'displayed', 'failed', 'processed', 'stored', 'forbidden', 'error']) message_timstamp = StringProperty() code = IntegerProperty() reason = StringProperty() class IncomingHeaderPrefixes(StringArray): list_validator = UniqueItemsValidator() # Response models class AckResponse(SylkRTCResponseBase): sylkrtc = FixedValueProperty('ack') class ErrorResponse(SylkRTCResponseBase): sylkrtc = FixedValueProperty('error') error = StringProperty() # Connection events class ReadyEvent(JSONObject): sylkrtc = FixedValueProperty('ready-event') class LookupPublicKeyEvent(JSONObject): sylkrtc = FixedValueProperty('lookup-public-key-event') uri = StringProperty(validator=AORValidator()) public_key = StringProperty(optional=True) # Account events class AccountIncomingSessionEvent(AccountEventBase): event = FixedValueProperty('incoming-session') session = StringProperty() originator = ObjectProperty(SIPIdentity) # type: SIPIdentity sdp = StringProperty() call_id = StringProperty() headers = AbstractProperty(optional=True) class AccountMissedSessionEvent(AccountEventBase): event = FixedValueProperty('missed-session') originator = ObjectProperty(SIPIdentity) # type: SIPIdentity class AccountConferenceInviteEvent(AccountEventBase): event = FixedValueProperty('conference-invite') room = StringProperty(validator=AORValidator()) session_id = StringProperty() originator = ObjectProperty(SIPIdentity) # type: SIPIdentity class AccountMessageEvent(AccountEventBase): event = FixedValueProperty('message') sender = ObjectProperty(SIPIdentity) # type: SIPIdentity timestamp = StringProperty() disposition_notification = ArrayProperty(DispositionNotifications, optional=True) message_id = StringProperty() content_type = StringProperty() content = StringProperty() direction = StringProperty(optional=True) class AccountDispositionNotificationEvent(AccountEventBase): event = FixedValueProperty('disposition-notification') message_id = StringProperty() message_timestamp = StringProperty() state = LimitedChoiceProperty(['accepted', 'delivered', 'displayed', 'failed', 'processed', 'stored', 'forbidden', 'error']) code = IntegerProperty() reason = StringProperty() class AccountSyncConversationsEvent(AccountEventBase): event = FixedValueProperty('sync-conversations') messages = ArrayProperty(ContactMessages) class AccountSyncEvent(AccountEventBase): event = FixedValueProperty('sync') type = StringProperty() action = StringProperty() content = AbstractObjectProperty() class AccountRegisteringEvent(AccountRegistrationStateEvent): state = FixedValueProperty('registering') class AccountRegisteredEvent(AccountRegistrationStateEvent): state = FixedValueProperty('registered') class AccountRegistrationFailedEvent(AccountRegistrationStateEvent): state = FixedValueProperty('failed') reason = StringProperty(optional=True) # Session events class SessionProgressEvent(SessionStateEvent): state = FixedValueProperty('progress') class SessionEarlyMediaEvent(SessionStateEvent): state = FixedValueProperty('early-media') sdp = StringProperty(optional=True) call_id = StringProperty(optional=True) class SessionAcceptedEvent(SessionStateEvent): state = FixedValueProperty('accepted') sdp = StringProperty(optional=True) # missing for incoming sessions call_id = StringProperty(optional=True) headers = AbstractProperty(optional=True) class SessionEstablishedEvent(SessionStateEvent): state = FixedValueProperty('established') class SessionTerminatedEvent(SessionStateEvent): state = FixedValueProperty('terminated') reason = StringProperty(optional=True) class SessionMessageEvent(SessionEventBase): event = FixedValueProperty('message') sender = ObjectProperty(SIPIdentity) # type: SIPIdentity timestamp = StringProperty() disposition_notification = ArrayProperty(DispositionNotifications, optional=True) message_id = StringProperty() content_type = StringProperty() content = StringProperty() direction = StringProperty(optional=True) class SessionMessageDispositionNotificationEvent(SessionEventBase): event = FixedValueProperty('disposition-notification') message_id = StringProperty() message_timestamp = StringProperty() state = LimitedChoiceProperty(['accepted', 'delivered', 'displayed', 'failed', 'processed', 'stored', 'forbidden', 'error']) code = IntegerProperty() reason = StringProperty() # Video room events class VideoroomConfigureEvent(VideoroomEventBase): event = FixedValueProperty('configure') originator = StringProperty() active_participants = ArrayProperty(VideoroomActiveParticipants) # type: VideoroomActiveParticipants class VideoroomSessionProgressEvent(VideoroomSessionStateEvent): state = FixedValueProperty('progress') class VideoroomSessionAcceptedEvent(VideoroomSessionStateEvent): state = FixedValueProperty('accepted') sdp = StringProperty() video = BooleanProperty(optional=True, default=True) audio = BooleanProperty(optional=True, default=True) class VideoroomSessionEstablishedEvent(VideoroomSessionStateEvent): state = FixedValueProperty('established') class VideoroomSessionTerminatedEvent(VideoroomSessionStateEvent): state = FixedValueProperty('terminated') reason = StringProperty(optional=True) class VideoroomFeedAttachedEvent(VideoroomEventBase): event = FixedValueProperty('feed-attached') feed = StringProperty() sdp = StringProperty() class VideoroomFeedEstablishedEvent(VideoroomEventBase): event = FixedValueProperty('feed-established') feed = StringProperty() class VideoroomInitialPublishersEvent(VideoroomEventBase): event = FixedValueProperty('initial-publishers') publishers = ArrayProperty(VideoroomPublishers) # type: VideoroomPublishers class VideoroomPublishersJoinedEvent(VideoroomEventBase): event = FixedValueProperty('publishers-joined') publishers = ArrayProperty(VideoroomPublishers) # type: VideoroomPublishers class VideoroomPublishersLeftEvent(VideoroomEventBase): event = FixedValueProperty('publishers-left') publishers = ArrayProperty(StringArray) # type: StringArray class VideoroomFileSharingEvent(VideoroomEventBase): event = FixedValueProperty('file-sharing') files = ArrayProperty(SharedFiles) # type: SharedFiles class VideoroomMessageEvent(VideoroomEventBase): event = FixedValueProperty('message') type = LimitedChoiceProperty(['normal', 'status']) content = StringProperty() content_type = StringProperty() sender = ObjectProperty(SIPIdentity) # type: SIPIdentity timestamp = StringProperty() class VideoroomComposingIndicationEvent(VideoroomEventBase): event = FixedValueProperty('composing-indication') state = StringProperty() refresh = IntegerProperty() content_type = StringProperty() sender = ObjectProperty(SIPIdentity) # type: SIPIdentity class VideoroomMessageDeliveryEvent(VideoroomEventBase): event = FixedValueProperty('message-delivery') message_id = StringProperty() delivered = BooleanProperty() code = IntegerProperty() reason = StringProperty() class VideoroomMuteAudioEvent(VideoroomEventBase): event = FixedValueProperty('mute-audio') originator = StringProperty() class VideoroomRaisedHandsEvent(VideoroomEventBase): event = FixedValueProperty('raised-hands') raised_hands = ArrayProperty(VideoroomRaisedHands) # Ping request model, can be used to check connectivity from client class PingRequest(SylkRTCRequestBase): sylkrtc = FixedValueProperty('ping') # Lookup Public key model class LookupPublicKeyRequest(SylkRTCRequestBase): sylkrtc = FixedValueProperty('lookup-public-key') uri = StringProperty(validator=AORValidator()) # Account request models class AccountAddRequest(AccountRequestBase): sylkrtc = FixedValueProperty('account-add') password = StringProperty(validator=LengthValidator(minimum=1, maximum=9999)) display_name = StringProperty(optional=True) user_agent = StringProperty(optional=True) incoming_header_prefixes = ArrayProperty(IncomingHeaderPrefixes, optional=True) class AccountRemoveRequest(AccountRequestBase): sylkrtc = FixedValueProperty('account-remove') class AccountRegisterRequest(AccountRequestBase): sylkrtc = FixedValueProperty('account-register') class AccountUnregisterRequest(AccountRequestBase): sylkrtc = FixedValueProperty('account-unregister') class AccountDeviceTokenRequest(AccountRequestBase): sylkrtc = FixedValueProperty('account-devicetoken') token = StringProperty() platform = StringProperty() device = StringProperty() silent = BooleanProperty(default=False) app = StringProperty() class AccountMessageRequest(AccountRequestBase): sylkrtc = FixedValueProperty('account-message') uri = StringProperty(validator=AORValidator()) message_id = StringProperty() content = StringProperty() content_type = StringProperty() timestamp = StringProperty() class AccountDispositionNotificationRequest(AccountRequestBase): sylkrtc = FixedValueProperty('account-disposition-notification') uri = StringProperty(validator=AORValidator()) message_id = StringProperty() state = LimitedChoiceProperty(['delivered', 'failed', 'displayed', 'forbidden', 'error']) timestamp = StringProperty() class AccountSyncConversationsRequest(AccountRequestBase): sylkrtc = FixedValueProperty('account-sync-conversations') message_id = StringProperty(optional=True) class AccountMarkConversationReadRequest(AccountRequestBase): sylkrtc = FixedValueProperty('account-mark-conversation-read') contact = StringProperty(validator=AORValidator()) class AccountMessageRemoveRequest(AccountRequestBase): sylkrtc = FixedValueProperty('account-remove-message') message_id = StringProperty() contact = StringProperty(validator=AORValidator()) class AccountConversationRemoveRequest(AccountRequestBase): sylkrtc = FixedValueProperty('account-remove-conversation') contact = StringProperty(validator=AORValidator()) # Session request models class SessionCreateRequest(SessionRequestBase): sylkrtc = FixedValueProperty('session-create') account = StringProperty(validator=AORValidator()) uri = StringProperty(validator=AORValidator()) sdp = StringProperty() headers = ArrayProperty(Headers, optional=True) class SessionAnswerRequest(SessionRequestBase): sylkrtc = FixedValueProperty('session-answer') sdp = StringProperty() headers = ArrayProperty(Headers, optional=True) class SessionTrickleRequest(SessionRequestBase): sylkrtc = FixedValueProperty('session-trickle') candidates = ArrayProperty(ICECandidates) # type: ICECandidates class SessionTerminateRequest(SessionRequestBase): sylkrtc = FixedValueProperty('session-terminate') class SessionMessageRequest(SessionRequestBase): sylkrtc = FixedValueProperty('session-message') message_id = StringProperty() content = StringProperty() content_type = StringProperty() timestamp = StringProperty() # Videoroom request models class VideoroomJoinRequest(VideoroomRequestBase): sylkrtc = FixedValueProperty('videoroom-join') account = StringProperty(validator=AORValidator()) uri = StringProperty(validator=AORValidator()) sdp = StringProperty() audio = BooleanProperty(optional=True, default=True) video = BooleanProperty(optional=True, default=True) class VideoroomLeaveRequest(VideoroomRequestBase): sylkrtc = FixedValueProperty('videoroom-leave') class VideoroomConfigureRequest(VideoroomRequestBase): sylkrtc = FixedValueProperty('videoroom-configure') active_participants = ArrayProperty(VideoroomActiveParticipants) # type: VideoroomActiveParticipants class VideoroomFeedAttachRequest(VideoroomRequestBase): sylkrtc = FixedValueProperty('videoroom-feed-attach') publisher = StringProperty() feed = StringProperty() class VideoroomFeedAnswerRequest(VideoroomRequestBase): sylkrtc = FixedValueProperty('videoroom-feed-answer') feed = StringProperty() sdp = StringProperty() class VideoroomFeedDetachRequest(VideoroomRequestBase): sylkrtc = FixedValueProperty('videoroom-feed-detach') feed = StringProperty() class VideoroomInviteRequest(VideoroomRequestBase): sylkrtc = FixedValueProperty('videoroom-invite') participants = ArrayProperty(AORList) # type: AORList class VideoroomSessionTrickleRequest(VideoroomRequestBase): sylkrtc = FixedValueProperty('videoroom-session-trickle') candidates = ArrayProperty(ICECandidates) # type: ICECandidates class VideoroomSessionUpdateRequest(VideoroomRequestBase): sylkrtc = FixedValueProperty('videoroom-session-update') options = ObjectProperty(VideoroomSessionOptions) # type: VideoroomSessionOptions class VideoroomMessageRequest(VideoroomRequestBase): sylkrtc = FixedValueProperty('videoroom-message') message_id = StringProperty() content = StringProperty() content_type = StringProperty() class VideoroomComposingIndicationRequest(VideoroomRequestBase): sylkrtc = FixedValueProperty('videoroom-composing-indication') state = LimitedChoiceProperty(['active', 'idle']) refresh = IntegerProperty(optional=True) class VideoroomMuteAudioParticipantsRequest(VideoroomRequestBase): sylkrtc = FixedValueProperty('videoroom-mute-audio-participants') class VideoroomToggleHandRequest(VideoroomRequestBase): sylkrtc = FixedValueProperty('videoroom-toggle-hand') session_id = StringProperty(optional=True) # SylkRTC request to model mapping class ProtocolError(Exception): pass class SylkRTCRequest(object): __classmap__ = {cls.sylkrtc.value: cls for cls in subclasses(SylkRTCRequestBase) if hasattr(cls, 'sylkrtc')} @classmethod def from_message(cls, message): try: request_type = message['sylkrtc'] except KeyError: raise ProtocolError('could not get WebSocket message type') try: request_class = cls.__classmap__[request_type] except KeyError: raise ProtocolError('unknown WebSocket request: %s' % request_type) return request_class(**message) diff --git a/sylk/applications/webrtcgateway/web.py b/sylk/applications/webrtcgateway/web.py index 3dc0687..8e14927 100644 --- a/sylk/applications/webrtcgateway/web.py +++ b/sylk/applications/webrtcgateway/web.py @@ -1,249 +1,282 @@ import json +import os from application.python.types import Singleton from autobahn.twisted.resource import WebSocketResource +from sipsimple.configuration.settings import SIPSimpleSettings from twisted.internet import defer, reactor from twisted.python.failure import Failure from twisted.web.server import Site from werkzeug.exceptions import Forbidden, NotFound from werkzeug.utils import secure_filename from sylk import __version__ as sylk_version from sylk.resources import Resources from sylk.web import File, Klein, StaticFileResource, server from . import push from .configuration import GeneralConfig, JanusConfig from .factory import SylkWebSocketServerFactory from .janus import JanusBackend from .logger import log from .models import sylkrtc from .protocol import SYLK_WS_PROTOCOL from .storage import TokenStorage, MessageStorage __all__ = 'WebHandler', 'AdminWebHandler' class FileUploadRequest(object): def __init__(self, shared_file, content): self.deferred = defer.Deferred() self.shared_file = shared_file self.content = content self.had_error = False class ApiTokenAuthError(Exception): pass class WebRTCGatewayWeb(object, metaclass=Singleton): app = Klein() def __init__(self, ws_factory): self._resource = self.app.resource() self._ws_resource = WebSocketResource(ws_factory) self._ws_factory = ws_factory @property def resource(self): return self._resource @app.route('/', branch=True) def index(self, request): return StaticFileResource(Resources.get('html/webrtcgateway/')) @app.route('/ws') def ws(self, request): return self._ws_resource @app.route('/filesharing///', methods=['OPTIONS', 'POST', 'GET']) def filesharing(self, request, conference, session_id, filename): conference_uri = conference.lower() if conference_uri in self._ws_factory.videorooms: videoroom = self._ws_factory.videorooms[conference_uri] if session_id in videoroom: request.setHeader('Access-Control-Allow-Origin', '*') request.setHeader('Access-Control-Allow-Headers', 'content-type') method = request.method.upper().decode() session = videoroom[session_id] if method == 'POST': def log_result(result): if isinstance(result, Failure): videoroom.log.warning('{file.uploader.uri} failed to upload {file.filename}: {error}'.format(file=upload_request.shared_file, error=result.value)) else: videoroom.log.info('{file.uploader.uri} has uploaded {file.filename}'.format(file=upload_request.shared_file)) return result filename = secure_filename(filename) filesize = int(request.getHeader('Content-Length')) shared_file = sylkrtc.SharedFile(filename=filename, filesize=filesize, uploader=dict(uri=session.account.id, display_name=session.account.display_name), session=session_id) session.owner.log.info('wants to upload file {filename} to video room {conference_uri} with session {session_id}'.format(filename=filename, conference_uri=conference_uri, session_id=session_id)) upload_request = FileUploadRequest(shared_file, request.content) videoroom.add_file(upload_request) upload_request.deferred.addBoth(log_result) return upload_request.deferred elif method == 'GET': filename = secure_filename(filename) session.owner.log.info('wants to download file {filename} from video room {conference_uri} with session {session_id}'.format(filename=filename, conference_uri=conference_uri, session_id=session_id)) try: path = videoroom.get_file(filename) except LookupError as e: videoroom.log.warning('{session.account.id} failed to download {filename}: {error}'.format(session=session, filename=filename, error=e)) raise NotFound() else: videoroom.log.info('{session.account.id} is downloading {filename}'.format(session=session, filename=filename)) request.setHeader('Content-Disposition', 'attachment;filename=%s' % filename) return File(path) else: return 'OK' raise Forbidden() + @app.route('/filetransfer////', methods=['GET']) + def filetransfer(self, request, sender, receiver, transfer_id, filename): + filename = secure_filename(filename) + settings = SIPSimpleSettings() + folder = os.path.join(settings.file_transfer.directory.normalized, sender[:1], sender, receiver, transfer_id) + path = '%s/%s' % (folder, filename) + log_path = os.path.join(sender, receiver, transfer_id, filename) + if os.path.exists(path): + file_size = os.path.getsize(path) + split_tup = os.path.splitext(path) + file_extension = split_tup[1] + render_type = 'inline' if file_extension and file_extension.lower() in ('.jpg', '.png', '.jpeg', '.gif') else 'attachment' + request.setHeader('Content-Disposition', '%s;filename=%s' % (render_type, filename)) + log.info('Web %s file download %s (%s)' % (render_type, log_path, self.format_file_size(file_size))) + return File(path) + else: + log.warning(f'File not found: {path}') + raise NotFound() + def verify_api_token(self, request, account, msg_id, token=None): if token: auth_headers = request.requestHeaders.getRawHeaders('Authorization', default=None) if auth_headers: try: method, auth_token = auth_headers[0].split() except ValueError: log.warning(f'Authorization headers is not correct for message history request for {account}, it should be in the format: Apikey [TOKEN]') else: log.warning(f'Authorization headers missing on message history request for {account}') if not auth_headers or method != 'Apikey' or auth_token != token: log.warning(f'Token authentication error for {account}') raise ApiTokenAuthError() else: log.info(f'Returning message history for {account}') return self.get_account_messages(request, account, msg_id) else: log.warning(f'Token not found for {account}') raise ApiTokenAuthError() def tokenError(self, error, request): raise ApiTokenAuthError() def get_account_messages(self, request, account, msg_id=None): account = account.lower() storage = MessageStorage() messages = storage[[account, msg_id]] request.setHeader('Content-Type', 'application/json') if isinstance(messages, defer.Deferred): return messages.addCallback(lambda result: json.dumps(sylkrtc.MessageHistoryData(account=account, messages=result).__data__)) @app.handle_errors(ApiTokenAuthError) def auth_error(self, request, failure): request.setResponseCode(401) return b'Unauthorized' @app.route('/messages/history/', methods=['OPTIONS', 'GET']) @app.route('/messages/history//', methods=['OPTIONS', 'GET']) def messages(self, request, account, msg_id=None): storage = MessageStorage() token = storage.get_account_token(account) if isinstance(token, defer.Deferred): token.addCallback(lambda result: self.verify_api_token(request, account, msg_id, result)) return token else: return self.verify_api_token(request, account, msg_id, token) + @staticmethod + def format_file_size(size): + infinite = float('infinity') + boundaries = [( 1024, '%d bytes', 1), + ( 10*1024, '%.2f KB', 1024.0), ( 1024*1024, '%.1f KB', 1024.0), + ( 10*1024*1024, '%.2f MB', 1024*1024.0), (1024*1024*1024, '%.1f MB', 1024*1024.0), + (10*1024*1024*1024, '%.2f GB', 1024*1024*1024.0), ( infinite, '%.1f GB', 1024*1024*1024.0)] + for boundary, format, divisor in boundaries: + if size < boundary: + return format % (size/divisor,) + else: + return "%d bytes" % size class WebHandler(object): def __init__(self): self.backend = None self.factory = None self.resource = None self.web = None def start(self): ws_url = 'ws' + server.url[4:] + '/webrtcgateway/ws' self.factory = SylkWebSocketServerFactory(ws_url, protocols=[SYLK_WS_PROTOCOL], server='SylkServer/%s' % sylk_version) self.factory.setProtocolOptions(allowedOrigins=GeneralConfig.web_origins, allowNullOrigin=GeneralConfig.web_origins == ['*'], autoPingInterval=GeneralConfig.websocket_ping_interval, autoPingTimeout=GeneralConfig.websocket_ping_interval/2) self.web = WebRTCGatewayWeb(self.factory) server.register_resource(b'webrtcgateway', self.web.resource) log.info('WebSocket handler started at %s' % ws_url) log.info('Allowed web origins: %s' % ', '.join(GeneralConfig.web_origins)) log.info('Allowed SIP domains: %s' % ', '.join(GeneralConfig.sip_domains)) log.info('Using Janus API: %s' % JanusConfig.api_url) self.backend = JanusBackend() self.backend.start() def stop(self): if self.factory is not None: for conn in self.factory.connections.copy(): conn.dropConnection(abort=True) self.factory = None if self.backend is not None: self.backend.stop() self.backend = None # TODO: This implementation is a prototype. Moving forward it probably makes sense to provide admin API # capabilities for other applications too. This could be done in a number of ways: # # * On the main web server, under a /admin/ parent route. # * On a separate web server, which could listen on a different IP and port. # # In either case, HTTPS aside, a token based authentication mechanism would be desired. # Which one is best is not 100% clear at this point. class AuthError(Exception): pass class AdminWebHandler(object, metaclass=Singleton): app = Klein() def __init__(self): self.listener = None def start(self): host, port = GeneralConfig.http_management_interface # noinspection PyUnresolvedReferences self.listener = reactor.listenTCP(port, Site(self.app.resource()), interface=host) log.info('Admin web handler started at http://%s:%d' % (host, port)) def stop(self): if self.listener is not None: self.listener.stopListening() self.listener = None # Admin web API def _check_auth(self, request): auth_secret = GeneralConfig.http_management_auth_secret if auth_secret: auth_headers = request.requestHeaders.getRawHeaders('Authorization', default=None) if not auth_headers or auth_headers[0] != auth_secret: raise AuthError() @app.handle_errors(AuthError) def auth_error(self, request, failure): request.setResponseCode(403) return 'Authentication error' @app.route('/tokens/') def get_tokens(self, request, account): self._check_auth(request) request.setHeader('Content-Type', 'application/json') storage = TokenStorage() tokens = storage[account] if isinstance(tokens, defer.Deferred): return tokens.addCallback(lambda result: json.dumps({'tokens': result})) else: return json.dumps({'tokens': tokens}) @app.route('/tokens//', methods=['DELETE']) def process_token(self, request, account, device_token): self._check_auth(request) request.setHeader('Content-Type', 'application/json') storage = TokenStorage() if request.method == 'DELETE': storage.remove(account, device_token) return json.dumps({'success': True})