Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F7232118
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
8 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/sylk/applications/webrtcgateway/__init__.py b/sylk/applications/webrtcgateway/__init__.py
index e17b6c5..2ce4330 100644
--- a/sylk/applications/webrtcgateway/__init__.py
+++ b/sylk/applications/webrtcgateway/__init__.py
@@ -1,206 +1,205 @@
import os
import time
-import uuid
import errno
from application.notification import IObserver, NotificationCenter
from application.python import Null
from application.system import unlink
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.streams.msrp.chat import CPIMPayload, CPIMParserError
from sipsimple.threading import run_in_thread
from twisted.internet import defer, reactor
from zope.interface import implementer
from sylk.applications import SylkApplication
from sylk.session import IllegalStateError
from .configuration import GeneralConfig
from .datatypes import FileTransferData
from .sip_handlers import FileTransferHandler, MessageHandler
from .logger import log
from .storage import TokenStorage, MessageStorage
from .web import WebHandler, AdminWebHandler
@implementer(IObserver)
class WebRTCGatewayApplication(SylkApplication):
def __init__(self):
self.web_handler = WebHandler()
self.admin_web_handler = AdminWebHandler()
def start(self):
self.web_handler.start()
self.admin_web_handler.start()
# Load tokens from the storage
token_storage = TokenStorage()
token_storage.load()
# Setup message storage
message_storage = MessageStorage()
message_storage.load()
self.clean_filetransfers()
def stop(self):
self.web_handler.stop()
self.admin_web_handler.stop()
@run_in_thread('file-io')
def clean_filetransfers(self):
settings = SIPSimpleSettings()
top = settings.file_transfer.directory.normalized
removed_dirs = removed_files = 0
for root, dirs, files in os.walk(top, topdown=False):
for name in files:
file = os.path.join(root, name)
statinfo = os.stat(file)
current_time = time.time()
remove_after_days = GeneralConfig.filetransfer_expire_days
if (statinfo.st_size >= 1024 * 1024 * 50 and statinfo.st_mtime < current_time - 86400 * remove_after_days):
log.info(f"[housekeeper] Removing expired filetransfer file: {file}")
removed_files += 1
unlink(file)
elif statinfo.st_mtime < current_time - 86400 * 2 * remove_after_days:
log.info(f"[housekeeper] Removing expired file transfer file: {file}")
removed_files += 1
unlink(file)
for name in dirs:
dir = os.path.join(root, name)
try:
os.rmdir(dir)
except OSError as ex:
if ex.errno == errno.ENOTEMPTY:
pass
else:
removed_dirs += 1
log.info(f"[housekeeper] Removing expired file transfer dir {dir}")
log.info(f"[housekeeper] Removed {removed_files} files, {removed_dirs} directories")
reactor.callLater(3600, self.clean_filetransfers)
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def incoming_session(self, session):
# TODO: handle diverted sessions?
log.info('New incoming session {session.call_id} from sip:{uri.user}@{uri.host}'.format(session=session, uri=session.remote_identity.uri))
transfer_streams = [stream for stream in session.proposed_streams if stream.type == 'file-transfer']
if not transfer_streams:
log.info(u'Session rejected: invalid media')
session.reject(488)
return
transfer_stream = transfer_streams[0]
if transfer_stream.direction == 'sendonly':
# file transfer 'pull'
log.info('Session rejected: requested file not found')
session.reject(404)
return
sender = f'{session.remote_identity.uri.user}@{session.remote_identity.uri.host}'
receiver = f'{session.local_identity.uri.user}@{session.local_identity.uri.host}'
file_selector = transfer_stream.file_selector
transfer_data = FileTransferData(file_selector.name,
file_selector.size,
file_selector.type,
- str(uuid.uuid4()),
+ transfer_stream.transfer_id,
sender,
receiver)
session.transfer_data = transfer_data
message_storage = MessageStorage()
account = defer.maybeDeferred(message_storage.get_account, receiver)
account.addCallback(lambda result: self._check_receiver_session(result))
sender_account = defer.maybeDeferred(message_storage.get_account, sender)
sender_account.addCallback(lambda result: self._check_sender_session(result, session))
d1 = defer.DeferredList([account, sender_account], consumeErrors=True)
d1.addCallback(lambda result: self._handle_lookup_result(result, session, transfer_stream))
NotificationCenter().add_observer(self, sender=session)
def _check_receiver_session(self, account):
if account is None:
raise Exception("Receiver account for filetransfer not found")
def _check_sender_session(self, account, session):
if account is None:
session.transfer_data.update_path_for_receiver()
raise Exception("Sender account for filetransfer not found")
def _handle_lookup_result(self, result, session, stream):
reject_session = all([success is not True for (success, value) in result])
if reject_session:
self._reject_session(session, "Sender and receiver accounts for filetransfer were not found")
return
stream.handler.save_directory = session.transfer_data.path
log.info('File transfer from {sender.uri} to {receiver.uri} will be saved to {path}/{filename}'.format(**session.transfer_data.__dict__))
self._accept_session(session, [stream])
def _reject_session(self, session, error):
log.warning(f'File transfer rejected: {error}')
session.reject(404)
def _accept_session(self, session, streams):
try:
session.accept(streams)
except IllegalStateError:
session.reject(500)
def incoming_subscription(self, request, data):
request.reject(405)
def incoming_referral(self, request, data):
request.reject(405)
def incoming_message(self, message_request, data):
content_type = data.headers.get('Content-Type', Null).content_type
from_header = data.headers.get('From', Null)
to_header = data.headers.get('To', Null)
if Null in (content_type, from_header, to_header):
message_request.answer(400)
return
if content_type == 'message/cpim':
try:
cpim_message = CPIMPayload.decode(data.body)
except (CPIMParserError, UnicodeDecodeError): # TODO: fix decoding in sipsimple
log.warning('SIP message from %s to %s rejected: CPIM parse error' % (from_header.uri, '%s@%s' % (to_header.uri.user, to_header.uri.host)))
message_request.answer(400)
return
else:
content_type = cpim_message.content_type
log.info('received SIP message (%s) from %s to %s' % (content_type, from_header.uri, '%s@%s' % (to_header.uri.user, to_header.uri.host)))
message_request.answer(200)
message_handler = MessageHandler()
message_handler.incoming_message(data)
def _NH_SIPSessionDidStart(self, notification):
session = notification.sender
try:
transfer_stream = next(stream for stream in session.streams if stream.type == 'file-transfer')
except StopIteration:
pass
else:
transfer_handler = FileTransferHandler()
transfer_handler.init_incoming(transfer_stream)
def _NH_SIPSessionDidEnd(self, notification):
session = notification.sender
notification.center.remove_observer(self, sender=session)
def _NH_SIPSessionDidFail(self, notification):
session = notification.sender
notification.center.remove_observer(self, sender=session)
log.info('File transfer from %s to %s failed: %s (%s)' % (session.remote_identity.uri, session.local_identity.uri, notification.data.reason, notification.data.failure_reason))
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sat, Dec 28, 11:17 AM (1 d, 9 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3454276
Default Alt Text
(8 KB)
Attached To
Mode
rSYLK SylkServer
Attached
Detach File
Event Timeline
Log In to Comment