Page MenuHomePhabricator

No OneTemporary

diff --git a/sylk/applications/webrtcgateway/sip_handlers.py b/sylk/applications/webrtcgateway/sip_handlers.py
index c83a9ad..c1a4f2f 100644
--- a/sylk/applications/webrtcgateway/sip_handlers.py
+++ b/sylk/applications/webrtcgateway/sip_handlers.py
@@ -1,508 +1,509 @@
import json
import random
import os
import secrets
import uuid
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python import Null
from application.system import unlink
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import SIPURI, FromHeader, ToHeader, Message, Request, RouteHeader, Route, Header
from sipsimple.lookup import DNSLookup, DNSLookupError
from sipsimple.payloads.imdn import IMDNDocument
from sipsimple.payloads.rcsfthttp import FTHTTPDocument
from sipsimple.streams.msrp.chat import CPIMPayload, Message as SIPMessage
from sipsimple.threading import run_in_twisted_thread
from sipsimple.threading.green import run_in_green_thread
from sipsimple.util import ISOTimestamp
from twisted.internet import reactor, defer
from zope.interface import implementer
from sylk.configuration import SIPConfig
from sylk.web import server
from . import push
from .configuration import GeneralConfig
from .logger import log
from .models import sylkrtc
from .storage import MessageStorage
from .datatypes import FileTransferData
class ParsedSIPMessage(SIPMessage):
__slots__ = 'message_id', 'disposition', 'destination'
def __init__(self, content, content_type, sender=None, recipients=None, courtesy_recipients=None, subject=None, timestamp=None, required=None, additional_headers=None, message_id=None, disposition=None, destination=None):
super(ParsedSIPMessage, self).__init__(content, content_type, sender, recipients, courtesy_recipients, subject, timestamp, required, additional_headers)
self.message_id = message_id
self.disposition = disposition
self.destination = destination
@implementer(IObserver)
class ReplicatedMessage(Message):
def __init__(self, from_header, to_header, route_header, content_type, body, credentials=None, extra_headers=None):
super(ReplicatedMessage, self).__init__(from_header, to_header, route_header, content_type, body, credentials=None, extra_headers=None)
self._request = Request("MESSAGE", from_header.uri, from_header, to_header, route_header, credentials=credentials, extra_headers=extra_headers, content_type=content_type, body=body if isinstance(body, bytes) else body.encode())
@implementer(IObserver)
class MessageHandler(object):
def __init__(self):
self.message_storage = MessageStorage()
self.resolver = DNSLookup()
self.from_header = None
self.to_header = None
self.content_type = None
self.from_sip = None
self.body = None
self.parsed_message = None
def _lookup_sip_target_route(self, uri):
proxy = GeneralConfig.outbound_sip_proxy
if proxy is not None:
sip_uri = SIPURI(host=proxy.host, port=proxy.port, parameters={'transport': proxy.transport})
else:
sip_uri = SIPURI.parse('sip:%s' % uri)
settings = SIPSimpleSettings()
try:
routes = self.resolver.lookup_sip_proxy(sip_uri, settings.sip.transport_list).wait()
except DNSLookupError as e:
raise DNSLookupError('DNS lookup error: {exception!s}'.format(exception=e))
if not routes:
raise DNSLookupError('DNS lookup error: no results found')
route = random.choice([r for r in routes if r.transport == routes[0].transport])
log.debug('DNS lookup for SIP message proxy for {} yielded {}'.format(uri, route))
return route
def _parse_message(self):
cpim_message = None
if self.content_type == "message/cpim":
cpim_message = CPIMPayload.decode(self.body)
body = cpim_message.content if isinstance(cpim_message.content, str) else cpim_message.content.decode()
content_type = cpim_message.content_type
sender = cpim_message.sender or self.from_header
disposition = next(([item.strip() for item in header.value.split(',')] for header in cpim_message.additional_headers if header.name == 'Disposition-Notification'), None)
message_id = next((header.value for header in cpim_message.additional_headers if header.name == 'Message-ID'), str(uuid.uuid4()))
else:
body = self.body.decode('utf-8')
sender = self.from_header
disposition = None
message_id = str(uuid.uuid4())
content_type = str(self.content_type)
timestamp = str(cpim_message.timestamp) if cpim_message is not None and cpim_message.timestamp is not None else str(ISOTimestamp.now())
sender = sylkrtc.SIPIdentity(uri=str(sender.uri), display_name=sender.display_name)
destination = sylkrtc.SIPIdentity(uri=str(self.to_header.uri), display_name=self.to_header.display_name)
if content_type == FTHTTPDocument.content_type:
document = FTHTTPDocument.parse(body)
for info in document:
if info.type == 'file':
transfer_data = FileTransferData(info.file_name.value,
int(info.file_size.value),
info.content_type.value,
message_id,
sender.uri,
destination.uri,
url=info.data.url)
metadata = sylkrtc.TransferredFile(**transfer_data.__dict__)
body = json.dumps(sylkrtc.FileTransferMessage(**metadata.__data__).__data__)
content_type = 'application/sylk-file-transfer'
self.parsed_message = ParsedSIPMessage(body, content_type, sender=sender, disposition=disposition, message_id=message_id, timestamp=timestamp, destination=destination)
def _send_public_key(self, from_header, to_header, public_key):
if public_key:
self.outgoing_message(from_header, public_key, 'text/pgp-public-key', str(to_header))
def _handle_generate_token(self):
account = f'{self.from_header.uri.user}@{self.from_header.uri.host}'
log.info(f'Adding {account} for storing messages')
self.message_storage.add_account(account)
token = secrets.token_urlsafe()
self.outgoing_message(self.from_header.uri, json.dumps({'token': token, 'url': f'{server.url}/webrtcgateway/messages/history/{account}'}), 'application/sylk-api-token')
self.message_storage.add_account_token(account=account, token=token)
def _handle_lookup_pgp_key(self):
account = f'{self.to_header.uri.user}@{self.to_header.uri.host}'
public_key = self.message_storage.get_public_key(account)
log.info(f'Public key lookup for {account}')
if isinstance(public_key, defer.Deferred):
public_key.addCallback(lambda result: self._send_public_key(self.from_header.uri, self.to_header.uri, result))
else:
self._send_public_key(self.from_header.uri, self.to_header.uri, public_key)
def _handle_message_remove(self):
account = f'{self.from_header.uri.user}@{self.from_header.uri.host}'
contact = f'{self.to_header.uri.user}@{self.to_header.uri.host}'
message_id = self.parsed_message.content
self.message_storage.removeMessage(account=account, message_id=message_id)
content = sylkrtc.AccountMessageRemoveEventData(contact=contact, message_id=message_id)
self.message_storage.add(account=account,
contact=contact,
direction='',
content=json.dumps(content.__data__),
content_type='application/sylk-message-remove',
timestamp=str(ISOTimestamp.now()),
disposition_notification='',
message_id=str(uuid.uuid4()))
event = sylkrtc.AccountSyncEvent(account=account, type='message', action='remove', content=content)
self.outgoing_message(self.from_header.uri, json.dumps(content.__data__), 'application/sylk-message-remove', str(self.to_header.uri))
notification_center = NotificationCenter()
notification_center.post_notification(name='SIPApplicationGotAccountRemoveMessage', sender=account, data=event)
def remove_message_from_receiver(msg_id, messages):
for message in messages:
if message.message_id == msg_id and message.direction == 'incoming':
account = message.account
message_id = message.message_id
self.message_storage.removeMessage(account=account, message_id=message_id)
content = sylkrtc.AccountMessageRemoveEventData(contact=message.contact, message_id=message_id)
self.message_storage.add(account=account,
contact=message.contact,
direction='',
content=json.dumps(content.__data__),
content_type='application/sylk-message-remove',
timestamp=str(ISOTimestamp.now()),
disposition_notification='',
message_id=str(uuid.uuid4()))
event = sylkrtc.AccountSyncEvent(account=account, type='message', action='remove', content=content)
notification_center.post_notification(name='SIPApplicationGotAccountRemoveMessage', sender=account, data=event)
log.info("Removed receiver message")
break
messages = self.message_storage[[contact, '']]
if isinstance(messages, defer.Deferred):
messages.addCallback(lambda result: remove_message_from_receiver(msg_id=message_id, messages=result))
else:
remove_message_from_receiver(msg_id=message_id, messages=messages)
def _store_message_for_sender(self, account):
if account is None:
log.info('not storing %s message from non-existent account %s to %s' % (self.parsed_message.content_type, self.from_header.uri, '%s@%s' % (self.to_header.uri.user, self.to_header.uri.host)))
return
log.debug(f"storage is enabled for originator {account.account}")
message = None
ignored_content_types = ("application/im-iscomposing+xml", 'text/pgp-public-key', IMDNDocument.content_type)
if self.parsed_message.content_type in ignored_content_types:
return
log.info('storing {content_type} message for account {originator} to {destination.uri}'.format(content_type=self.parsed_message.content_type, originator=account.account, destination=self.parsed_message.destination))
self.message_storage.add(account=account.account,
contact=f'{self.to_header.uri.user}@{self.to_header.uri.host}',
direction="outgoing",
content=self.parsed_message.content,
content_type=self.parsed_message.content_type,
timestamp=str(self.parsed_message.timestamp),
disposition_notification=self.parsed_message.disposition,
message_id=self.parsed_message.message_id,
state='accepted')
message = sylkrtc.AccountSyncEvent(account=account.account,
type='message',
action='add',
content=sylkrtc.AccountMessageRequest(
transaction='1',
account=account.account,
uri=f'{self.to_header.uri.user}@{self.to_header.uri.host}',
message_id=self.parsed_message.message_id,
content=self.parsed_message.content,
content_type=self.parsed_message.content_type,
timestamp=str(self.parsed_message.timestamp),
server_generated=True
))
notification_center = NotificationCenter()
notification_center.post_notification(name='SIPApplicationGotOutgoingAccountMessage', sender=account.account, data=message)
def _store_message_for_receiver(self, account):
if account is None:
log.info('not storing %s message from %s to non-existent account %s' % (self.parsed_message.content_type, self.from_header.uri, '%s@%s' % (self.to_header.uri.user, self.to_header.uri.host)))
return
log.debug(f'processing message from {self.from_header.uri} for account {account.account}')
message = None
notification_center = NotificationCenter()
if self.parsed_message.content_type == "application/im-iscomposing+xml":
return
if self.parsed_message.content_type == IMDNDocument.content_type:
document = IMDNDocument.parse(self.parsed_message.content)
imdn_message_id = document.message_id.value
imdn_status = document.notification.status.__str__()
imdn_datetime = document.datetime.__str__()
log.info('storing IMDN message ({status}) from {originator.uri}'.format(status=imdn_status, originator=self.parsed_message.sender))
self.message_storage.update(account=account.account,
state=imdn_status,
message_id=imdn_message_id)
self.message_storage.update(account=str(self.parsed_message.sender.uri),
state=imdn_status,
message_id=imdn_message_id)
message = sylkrtc.AccountDispositionNotificationEvent(account=account.account,
state=imdn_status,
message_id=imdn_message_id,
message_timestamp=imdn_datetime,
timestamp=str(self.parsed_message.timestamp),
code=200,
reason='')
imdn_message_event = message.__data__
# del imdn_message_event['account']
## Maybe prevent multiple imdn rows?
self.message_storage.add(account=account.account,
contact=self.parsed_message.sender.uri,
direction="incoming",
content=json.dumps(imdn_message_event),
content_type='message/imdn',
timestamp=str(self.parsed_message.timestamp),
disposition_notification='',
message_id=self.parsed_message.message_id,
state='received')
notification_center.post_notification(name='SIPApplicationGotAccountDispositionNotification',
sender=account.account,
data=NotificationData(message=message, sender=self.parsed_message.sender))
else:
log.info('storing {content_type} message from {originator.uri} for account {account}'.format(content_type=self.parsed_message.content_type, originator=self.parsed_message.sender, account=account.account))
self.message_storage.add(account=account.account,
contact=str(self.parsed_message.sender.uri),
direction='incoming',
content=self.parsed_message.content,
content_type=self.parsed_message.content_type,
timestamp=str(self.parsed_message.timestamp),
disposition_notification=self.parsed_message.disposition,
message_id=self.parsed_message.message_id,
state='received')
message = sylkrtc.AccountMessageEvent(account=account.account,
sender=self.parsed_message.sender,
content=self.parsed_message.content,
content_type=self.parsed_message.content_type,
timestamp=str(self.parsed_message.timestamp),
disposition_notification=self.parsed_message.disposition,
message_id=self.parsed_message.message_id)
notification_center.post_notification(name='SIPApplicationGotAccountMessage', sender=account.account, data=message)
if self.parsed_message.content_type == 'text/plain' or self.parsed_message.content_type == 'text/html':
def get_unread_messages(messages, originator):
unread = 1
for message in messages:
if ((message.content_type == 'text/plain' or message.content_type == 'text/html')
and message.direction == 'incoming' and message.contact != account.account
and 'display' in message.disposition):
unread += 1
# log.info(f'{message.disposition} {message.contact} {message.state}')
# log.info(f'there are {unread} messages')
push.message(originator=originator, destination=account.account, call_id=str(uuid.uuid4()), badge=unread)
messages = self.message_storage[[account.account, '']]
if isinstance(messages, defer.Deferred):
messages.addCallback(lambda result: get_unread_messages(messages=result, originator=message.sender))
else:
get_unread_messages(messages=messages, originator=message.sender)
def incoming_message(self, data, content_type=None):
self.content_type = content_type if content_type is not None else data.headers.get('Content-Type', Null).content_type
self.from_header = data.headers.get('From', Null)
self.to_header = data.headers.get('To', Null)
self.body = data.body
self.from_sip = data.headers.get('X-Sylk-From-Sip', Null)
self._parse_message()
if self.parsed_message.content_type == 'application/sylk-api-token':
self._handle_generate_token()
return
if self.parsed_message.content_type == 'application/sylk-api-pgp-key-lookup':
self._handle_lookup_pgp_key()
return
if self.parsed_message.content_type == 'application/sylk-api-message-remove':
self._handle_message_remove()
return
if self.from_sip is not Null:
log.debug("message is originating from SIP endpoint")
sender_account = self.message_storage.get_account(f'{self.from_header.uri.user}@{self.from_header.uri.host}')
if isinstance(sender_account, defer.Deferred):
sender_account.addCallback(lambda result: self._store_message_for_sender(result))
else:
self._store_message_for_sender(sender_account)
account = self.message_storage.get_account(f'{self.to_header.uri.user}@{self.to_header.uri.host}')
if isinstance(account, defer.Deferred):
account.addCallback(lambda result: self._store_message_for_receiver(result))
else:
self._store_message_for_receiver(account)
@run_in_green_thread
def _outgoing_message(self, to_uri, from_uri, content, content_type='text/plain', headers=[], route=None, message_type=Message, subscribe=True):
if not route:
return
from_uri = SIPURI.parse('%s' % from_uri)
to_uri = SIPURI.parse('%s' % to_uri)
content = content if isinstance(content, bytes) else content.encode()
message_request = message_type(FromHeader(from_uri),
ToHeader(to_uri),
RouteHeader(route.uri),
content_type,
content,
extra_headers=headers)
if subscribe:
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=message_request)
message_request.send()
@run_in_green_thread
def outgoing_message(self, uri, content, content_type='text/plain', identity=None, extra_headers=[]):
route = self._lookup_sip_target_route(uri)
if route:
if identity is None:
identity = f'sip:sylkserver@{SIPConfig.local_ip}'
log.info("sending message from '%s' to '%s' using proxy %s" % (identity, uri, route))
headers = [Header('X-Sylk-To-Sip', 'yes')] + extra_headers
self._outgoing_message(uri, identity, content, content_type, headers=headers, route=route)
@run_in_green_thread
def outgoing_message_to_self(self, uri, content, content_type='text/plain', identity=None, extra_headers=[]):
route = Route(address=SIPConfig.local_ip, port=SIPConfig.local_tcp_port, transport='tcp')
if route:
if identity is None:
identity = f'sip:sylkserver@{SIPConfig.local_ip}'
log.debug("sending message from '%s' to '%s' to self %s" % (identity, uri, route))
headers = [Header('X-Sylk-From-Sip', 'yes')] + extra_headers
self._outgoing_message(uri, identity, content, content_type, headers=headers, route=route, subscribe=False)
@run_in_green_thread
def outgoing_replicated_message(self, uri, content, content_type='text/plain', identity=None, extra_headers=[]):
route = self._lookup_sip_target_route(identity)
if route:
if identity is None:
identity = f'sip:sylkserver@{SIPConfig.local_ip}'
log.info("sending message from '%s' to '%s' using proxy %s" % (identity, uri, route))
headers = [Header('X-Sylk-To-Sip', 'yes'), Header('X-Replicated-Message', 'yes')] + extra_headers
self._outgoing_message(uri, identity, content, content_type, headers=headers, route=route, message_type=ReplicatedMessage)
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPMessageDidSucceed(self, notification):
notification_center = NotificationCenter()
notification_center.remove_observer(self, sender=notification.sender)
log.info('outgoing message was accepted by remote party')
def _NH_SIPMessageDidFail(self, notification):
notification_center = NotificationCenter()
notification_center.remove_observer(self, sender=notification.sender)
data = notification.data
reason = data.reason.decode() if isinstance(data.reason, bytes) else data.reason
log.warning('could not deliver outgoing message %d %s' % (data.code, reason))
@implementer(IObserver)
class FileTransferHandler(object):
def __init__(self):
self.session = None
self.stream = None
self.handler = None
self.direction = None
def init_incoming(self, stream):
self.direction = 'incoming'
self.stream = stream
self.session = stream.session
self.handler = stream.handler
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self.stream)
notification_center.add_observer(self, sender=self.handler)
@run_in_green_thread
def init_outgoing(self, destination, file):
self.direction = 'outgoing'
def _terminate(self, failure_reason=None):
notification_center = NotificationCenter()
notification_center.remove_observer(self, sender=self.stream)
notification_center.remove_observer(self, sender=self.handler)
if failure_reason is None:
if self.direction == 'incoming' and self.stream.direction == 'recvonly':
if not hasattr(self.session, 'transfer_data'):
return
transfer_data = self.session.transfer_data
metadata = sylkrtc.TransferredFile(**transfer_data.__dict__, hash=self.stream.file_selector.hash)
meta_filepath = os.path.join(transfer_data.path, f'meta-{metadata.filename}')
try:
with open(meta_filepath, 'w+') as output_file:
output_file.write(json.dumps(metadata.__data__))
except (OSError, IOError):
unlink(meta_filepath)
log.warning('Could not save metadata %s' % meta_filepath)
log.info('File transfer finished, saved to %s' % transfer_data.full_path)
- payload = transfer_data.message_payload
message_handler = MessageHandler()
- message_handler.outgoing_replicated_message(f'sip:{metadata.receiver.uri}', payload, identity=f'sip:{metadata.sender.uri}')
- message_handler.outgoing_message(f'sip:{metadata.receiver.uri}', payload, identity=f'sip:{metadata.sender.uri}')
-
cpim_payload = transfer_data.cpim_message_payload(metadata)
message_handler.outgoing_message_to_self(f'sip:{metadata.receiver.uri}', cpim_payload, content_type='message/cpim', identity=f'sip:{metadata.sender.uri}')
xml_payload = transfer_data.cpim_rcsfthttp_message_payload(metadata)
message_handler.outgoing_message(f'sip:{metadata.receiver.uri}', xml_payload, content_type='message/cpim', identity=f'sip:{metadata.sender.uri}')
message_handler.outgoing_replicated_message(f'sip:{metadata.receiver.uri}', xml_payload, content_type='message/cpim', identity=f'sip:{metadata.sender.uri}')
+
+ if not metadata.filename.endswith('.asc'):
+ payload = transfer_data.message_payload
+ message_handler.outgoing_replicated_message(f'sip:{metadata.receiver.uri}', payload, identity=f'sip:{metadata.sender.uri}')
+ message_handler.outgoing_message(f'sip:{metadata.receiver.uri}', payload, identity=f'sip:{metadata.sender.uri}')
else:
pass
self.session = None
self.stream = None
self.handler = None
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_MediaStreamDidNotInitialize(self, notification):
self._terminate(failure_reason=notification.data.reason)
def _NH_FileTransferHandlerDidEnd(self, notification):
if self.direction == 'incoming':
if self.stream.direction == 'sendonly':
reactor.callLater(3, self.session.end)
else:
reactor.callLater(1, self.session.end)
else:
self.session.end()
self._terminate(failure_reason=notification.data.reason)
diff --git a/sylk/applications/webrtcgateway/web.py b/sylk/applications/webrtcgateway/web.py
index b8fe9a3..41bd037 100644
--- a/sylk/applications/webrtcgateway/web.py
+++ b/sylk/applications/webrtcgateway/web.py
@@ -1,373 +1,373 @@
import json
import os
import hashlib
from shutil import copyfileobj
from application.system import makedirs
from sipsimple.streams.msrp.filetransfer import FileSelector
from application.python.types import Singleton
from autobahn.twisted.resource import WebSocketResource
from sipsimple.configuration.settings import SIPSimpleSettings
from twisted.internet import defer, reactor
from twisted.python.failure import Failure
from twisted.web.server import Site
from werkzeug.exceptions import Forbidden, NotFound
from werkzeug.utils import secure_filename
from sylk import __version__ as sylk_version
from sylk.resources import Resources
from sylk.web import File, Klein, StaticFileResource, server
from .configuration import GeneralConfig, JanusConfig
from .datatypes import FileTransferData
from .factory import SylkWebSocketServerFactory
from .janus import JanusBackend
from .logger import log
from .models import sylkrtc
from .protocol import SYLK_WS_PROTOCOL
from .sip_handlers import MessageHandler
from .storage import TokenStorage, MessageStorage
__all__ = 'WebHandler', 'AdminWebHandler'
class FileUploadRequest(object):
def __init__(self, shared_file, content):
self.deferred = defer.Deferred()
self.shared_file = shared_file
self.content = content
self.had_error = False
class ApiTokenAuthError(Exception): pass
class WebRTCGatewayWeb(object, metaclass=Singleton):
app = Klein()
def __init__(self, ws_factory):
self._resource = self.app.resource()
self._ws_resource = WebSocketResource(ws_factory)
self._ws_factory = ws_factory
@property
def resource(self):
return self._resource
@app.route('/', branch=True)
def index(self, request):
return StaticFileResource(Resources.get('html/webrtcgateway/'))
@app.route('/ws')
def ws(self, request):
return self._ws_resource
@app.route('/filesharing/<string:conference>/<string:session_id>/<string:filename>', methods=['OPTIONS', 'POST', 'GET'])
def filesharing(self, request, conference, session_id, filename):
conference_uri = conference.lower()
if conference_uri in self._ws_factory.videorooms:
videoroom = self._ws_factory.videorooms[conference_uri]
if session_id in videoroom:
request.setHeader('Access-Control-Allow-Origin', '*')
request.setHeader('Access-Control-Allow-Headers', 'content-type')
method = request.method.upper().decode()
session = videoroom[session_id]
if method == 'POST':
def log_result(result):
if isinstance(result, Failure):
videoroom.log.warning('{file.uploader.uri} failed to upload {file.filename}: {error}'.format(file=upload_request.shared_file, error=result.value))
else:
videoroom.log.info('{file.uploader.uri} has uploaded {file.filename}'.format(file=upload_request.shared_file))
return result
filename = secure_filename(filename)
filesize = int(request.getHeader('Content-Length'))
shared_file = sylkrtc.SharedFile(filename=filename, filesize=filesize, uploader=dict(uri=session.account.id, display_name=session.account.display_name), session=session_id)
session.owner.log.info('wants to upload file {filename} to video room {conference_uri} with session {session_id}'.format(filename=filename, conference_uri=conference_uri, session_id=session_id))
upload_request = FileUploadRequest(shared_file, request.content)
videoroom.add_file(upload_request)
upload_request.deferred.addBoth(log_result)
return upload_request.deferred
elif method == 'GET':
filename = secure_filename(filename)
session.owner.log.info('wants to download file {filename} from video room {conference_uri} with session {session_id}'.format(filename=filename, conference_uri=conference_uri, session_id=session_id))
try:
path = videoroom.get_file(filename)
except LookupError as e:
videoroom.log.warning('{session.account.id} failed to download {filename}: {error}'.format(session=session, filename=filename, error=e))
raise NotFound()
else:
videoroom.log.info('{session.account.id} is downloading {filename}'.format(session=session, filename=filename))
request.setHeader('Content-Disposition', 'attachment;filename=%s' % filename)
return File(path)
else:
return 'OK'
raise Forbidden()
@app.route('/filetransfer/<string:sender>/<string:receiver>/<string:transfer_id>/<string:filename>', methods=['GET', 'POST', 'OPTIONS'])
def filetransfer(self, request, sender, receiver, transfer_id, filename):
request.setHeader('Access-Control-Allow-Origin', '*')
request.setHeader('Access-Control-Allow-Headers', 'content-type')
method = request.method.upper().decode()
if method == 'POST':
ip = request.getClientIP()
connection_handlers = [connection.connection_handler for connection in self._ws_factory.connections if connection.peer.split(":")[1] == ip]
sender_connection = next((connection_handler for connection_handler in connection_handlers if sender in connection_handler.accounts_map), False)
if not sender_connection:
raise Forbidden
# TODO: Form support to support extra metadata?
filename = secure_filename(filename)
filesize = int(request.getHeader('Content-Length'))
filetype = request.getHeader('Content-Type') if request.getHeader('Content-Type') else 'application/octet-stream'
transfer_data = FileTransferData(filename, filesize, filetype, transfer_id, sender, receiver, content=request.content)
message_storage = MessageStorage()
account = defer.maybeDeferred(message_storage.get_account, receiver)
account.addCallback(lambda result: self._check_receiver(result))
sender_account = defer.maybeDeferred(message_storage.get_account, sender)
sender_account.addCallback(lambda result: self._check_sender(result, transfer_data))
d1 = defer.DeferredList([account, sender_account], consumeErrors=True)
d1.addCallback(lambda result: self._handle_lookup_result(result, transfer_data, sender_connection))
return d1
elif method == 'GET':
settings = SIPSimpleSettings()
folder = os.path.join(settings.file_transfer.directory.normalized, sender[:1], sender, receiver, transfer_id)
path = f'{folder}/{filename}'
log_path = os.path.join(sender, receiver, transfer_id, filename)
if os.path.exists(path):
file_size = os.path.getsize(path)
split_tup = os.path.splitext(path)
file_extension = split_tup[1]
render_type = 'inline' if file_extension and file_extension.lower() in ('.jpg', '.png', '.jpeg', '.gif') else 'attachment'
request.setHeader('Content-Disposition', '%s;filename=%s' % (render_type, filename))
log.info('Web %s file download %s (%s)' % (render_type, log_path, FileTransferData.format_file_size(file_size)))
return File(path)
else:
log.warning('Download failed, file not found: %s' % (log_path))
raise NotFound()
else:
return 'OK'
def _check_receiver(self, account):
if account is None:
raise Exception("Receiver account for file upload not found")
def _check_sender(self, account, transfer_data):
if account is None:
transfer_data.update_path_for_receiver()
raise Exception("Sender account for file upload not found")
def _handle_lookup_result(self, result, transfer_data, connection):
reject_session = all([success is not True for (success, value) in result])
if reject_session:
self._reject_upload("Sender and receiver accounts for file upload were not found")
return
log.info('File upload from {sender.uri} to {receiver.uri} will be saved to {path}/{filename}'.format(**transfer_data.__dict__))
return self._accept_upload(transfer_data, connection)
def _reject_upload(self, error):
log.warning(f'File upload rejected: {error}')
raise NotFound()
def _accept_upload(self, transfer_data, connection):
makedirs(transfer_data.path)
with open(os.path.join(transfer_data.path, transfer_data.filename), 'wb') as output_file:
copyfileobj(transfer_data.content, output_file)
part_size = 64 * 1024
sha1 = hashlib.sha1()
with open(os.path.join(transfer_data.path, transfer_data.filename), 'rb') as f:
while True:
data = f.read(part_size)
if not data:
break
sha1.update(data)
file_selector = FileSelector.for_file(os.path.join(transfer_data.path, transfer_data.filename))
file_selector.hash = sha1
metadata = sylkrtc.TransferredFile(**transfer_data.__dict__, hash=file_selector.hash)
meta_filepath = os.path.join(transfer_data.path, f'meta-{metadata.filename}')
try:
with open(meta_filepath, 'w+') as output_file:
output_file.write(json.dumps(metadata.__data__))
except (OSError, IOError):
log.warning('Could not save metadata %s' % meta_filepath)
- payload = transfer_data.cpim_message_payload(metadata)
message_handler = MessageHandler()
+ payload = transfer_data.cpim_message_payload(metadata)
message_handler.outgoing_message_to_self(f'sip:{metadata.receiver.uri}', payload, content_type='message/cpim', identity=f'sip:{metadata.sender.uri}')
- message_handler.outgoing_replicated_message(f'sip:{metadata.receiver.uri}', payload, content_type='message/cpim', identity=f'sip:{metadata.sender.uri}')
- message_handler.outgoing_message(f'sip:{metadata.receiver.uri}', payload, content_type='message/cpim', identity=f'sip:{metadata.sender.uri}')
-
- message_handler.outgoing_replicated_message(f'sip:{metadata.receiver.uri}', transfer_data.message_payload, content_type='text/plain', identity=f'sip:{metadata.sender.uri}')
- message_handler.outgoing_message(f'sip:{metadata.receiver.uri}', transfer_data.message_payload, content_type='text/plain', identity=f'sip:{metadata.sender.uri}')
xml_payload = transfer_data.cpim_rcsfthttp_message_payload(metadata)
message_handler.outgoing_replicated_message(f'sip:{metadata.receiver.uri}', xml_payload, content_type='message/cpim', identity=f'sip:{metadata.sender.uri}')
message_handler.outgoing_message(f'sip:{metadata.receiver.uri}', xml_payload, content_type='message/cpim', identity=f'sip:{metadata.sender.uri}')
+
+ if not metadata.filename.endswith('.asc'):
+ message_handler.outgoing_replicated_message(f'sip:{metadata.receiver.uri}', transfer_data.message_payload, content_type='text/plain', identity=f'sip:{metadata.sender.uri}')
+ message_handler.outgoing_message(f'sip:{metadata.receiver.uri}', transfer_data.message_payload, content_type='text/plain', identity=f'sip:{metadata.sender.uri}')
+
return "OK"
def verify_api_token(self, request, account, msg_id, token=None):
# print(msg_id)
# return self.get_account_messages(request, account)
if token:
auth_headers = request.requestHeaders.getRawHeaders('Authorization', default=None)
if auth_headers:
try:
method, auth_token = auth_headers[0].split()
except ValueError:
log.warning(f'Authorization headers is not correct for message history request for {account}, it should be in the format: Apikey [TOKEN]')
else:
log.warning(f'Authorization headers missing on message history request for {account}')
if not auth_headers or method != 'Apikey' or auth_token != token:
log.warning(f'Token authentication error for {account}')
raise ApiTokenAuthError()
else:
log.info(f'Returning message history for {account}')
return self.get_account_messages(request, account, msg_id)
else:
log.warning(f'Token not found for {account}')
raise ApiTokenAuthError()
def tokenError(self, error, request):
raise ApiTokenAuthError()
def get_account_messages(self, request, account, msg_id=None):
account = account.lower()
storage = MessageStorage()
messages = storage[[account, msg_id]]
request.setHeader('Content-Type', 'application/json')
if isinstance(messages, defer.Deferred):
return messages.addCallback(lambda result:
json.dumps(sylkrtc.MessageHistoryData(account=account, messages=result).__data__))
@app.handle_errors(ApiTokenAuthError)
def auth_error(self, request, failure):
request.setResponseCode(401)
return b'Unauthorized'
@app.route('/messages/history/<string:account>', methods=['OPTIONS', 'GET'])
@app.route('/messages/history/<string:account>/<string:msg_id>', methods=['OPTIONS', 'GET'])
def messages(self, request, account, msg_id=None):
storage = MessageStorage()
token = storage.get_account_token(account)
if isinstance(token, defer.Deferred):
token.addCallback(lambda result: self.verify_api_token(request, account, msg_id, result))
return token
else:
return self.verify_api_token(request, account, msg_id, token)
class WebHandler(object):
def __init__(self):
self.backend = None
self.factory = None
self.resource = None
self.web = None
def start(self):
ws_url = 'ws' + server.url[4:] + '/webrtcgateway/ws'
self.factory = SylkWebSocketServerFactory(ws_url, protocols=[SYLK_WS_PROTOCOL], server='SylkServer/%s' % sylk_version)
self.factory.setProtocolOptions(allowedOrigins=GeneralConfig.web_origins,
allowNullOrigin=GeneralConfig.web_origins == ['*'],
autoPingInterval=GeneralConfig.websocket_ping_interval,
autoPingTimeout=GeneralConfig.websocket_ping_interval/2)
self.web = WebRTCGatewayWeb(self.factory)
server.register_resource(b'webrtcgateway', self.web.resource)
log.info('WebSocket handler started at %s' % ws_url)
log.info('Allowed web origins: %s' % ', '.join(GeneralConfig.web_origins))
log.info('Allowed SIP domains: %s' % ', '.join(GeneralConfig.sip_domains))
log.info('Using Janus API: %s' % JanusConfig.api_url)
self.backend = JanusBackend()
self.backend.start()
def stop(self):
if self.factory is not None:
for conn in self.factory.connections.copy():
conn.dropConnection(abort=True)
self.factory = None
if self.backend is not None:
self.backend.stop()
self.backend = None
# TODO: This implementation is a prototype. Moving forward it probably makes sense to provide admin API
# capabilities for other applications too. This could be done in a number of ways:
#
# * On the main web server, under a /admin/ parent route.
# * On a separate web server, which could listen on a different IP and port.
#
# In either case, HTTPS aside, a token based authentication mechanism would be desired.
# Which one is best is not 100% clear at this point.
class AuthError(Exception): pass
class AdminWebHandler(object, metaclass=Singleton):
app = Klein()
def __init__(self):
self.listener = None
def start(self):
host, port = GeneralConfig.http_management_interface
# noinspection PyUnresolvedReferences
self.listener = reactor.listenTCP(port, Site(self.app.resource()), interface=host)
log.info('Admin web handler started at http://%s:%d' % (host, port))
def stop(self):
if self.listener is not None:
self.listener.stopListening()
self.listener = None
# Admin web API
def _check_auth(self, request):
auth_secret = GeneralConfig.http_management_auth_secret
if auth_secret:
auth_headers = request.requestHeaders.getRawHeaders('Authorization', default=None)
if not auth_headers or auth_headers[0] != auth_secret:
raise AuthError()
@app.handle_errors(AuthError)
def auth_error(self, request, failure):
request.setResponseCode(403)
return 'Authentication error'
@app.route('/tokens/<string:account>')
def get_tokens(self, request, account):
self._check_auth(request)
request.setHeader('Content-Type', 'application/json')
storage = TokenStorage()
tokens = storage[account]
if isinstance(tokens, defer.Deferred):
return tokens.addCallback(lambda result: json.dumps({'tokens': result}))
else:
return json.dumps({'tokens': tokens})
@app.route('/tokens/<string:account>/<string:device_token>', methods=['DELETE'])
def process_token(self, request, account, device_token):
self._check_auth(request)
request.setHeader('Content-Type', 'application/json')
storage = TokenStorage()
if request.method == 'DELETE':
storage.remove(account, device_token)
return json.dumps({'success': True})

File Metadata

Mime Type
text/x-diff
Expires
Sat, Feb 1, 3:16 PM (1 d, 14 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3488947
Default Alt Text
(46 KB)

Event Timeline