diff --git a/sylk/applications/webrtcgateway/sip_handlers.py b/sylk/applications/webrtcgateway/sip_handlers.py index 14cec12..de42991 100644 --- a/sylk/applications/webrtcgateway/sip_handlers.py +++ b/sylk/applications/webrtcgateway/sip_handlers.py @@ -1,509 +1,515 @@ import json import random import os import secrets import uuid from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from application.system import unlink from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SIPURI, FromHeader, ToHeader, Message, Request, RouteHeader, Route, Header from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.payloads.imdn import IMDNDocument from sipsimple.payloads.rcsfthttp import FTHTTPDocument from sipsimple.streams.msrp.chat import CPIMPayload, Message as SIPMessage from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import run_in_green_thread from sipsimple.util import ISOTimestamp from twisted.internet import reactor, defer from zope.interface import implementer from sylk.configuration import SIPConfig from sylk.web import server from . import push from .configuration import GeneralConfig from .logger import log from .models import sylkrtc from .storage import MessageStorage from .datatypes import FileTransferData class ParsedSIPMessage(SIPMessage): __slots__ = 'message_id', 'disposition', 'destination' def __init__(self, content, content_type, sender=None, recipients=None, courtesy_recipients=None, subject=None, timestamp=None, required=None, additional_headers=None, message_id=None, disposition=None, destination=None): super(ParsedSIPMessage, self).__init__(content, content_type, sender, recipients, courtesy_recipients, subject, timestamp, required, additional_headers) self.message_id = message_id self.disposition = disposition self.destination = destination @implementer(IObserver) class ReplicatedMessage(Message): def __init__(self, from_header, to_header, route_header, content_type, body, credentials=None, extra_headers=None): super(ReplicatedMessage, self).__init__(from_header, to_header, route_header, content_type, body, credentials=None, extra_headers=None) self._request = Request("MESSAGE", from_header.uri, from_header, to_header, route_header, credentials=credentials, extra_headers=extra_headers, content_type=content_type, body=body if isinstance(body, bytes) else body.encode()) @implementer(IObserver) class MessageHandler(object): def __init__(self): self.message_storage = MessageStorage() self.resolver = DNSLookup() self.from_header = None self.to_header = None self.content_type = None self.from_sip = None self.body = None self.parsed_message = None def _lookup_sip_target_route(self, uri): proxy = GeneralConfig.outbound_sip_proxy if proxy is not None: sip_uri = SIPURI(host=proxy.host, port=proxy.port, parameters={'transport': proxy.transport}) else: sip_uri = SIPURI.parse('sip:%s' % uri) settings = SIPSimpleSettings() try: routes = self.resolver.lookup_sip_proxy(sip_uri, settings.sip.transport_list).wait() except DNSLookupError as e: raise DNSLookupError('DNS lookup error: {exception!s}'.format(exception=e)) if not routes: raise DNSLookupError('DNS lookup error: no results found') route = random.choice([r for r in routes if r.transport == routes[0].transport]) log.debug('DNS lookup for SIP message proxy for {} yielded {}'.format(uri, route)) return route def _parse_message(self): cpim_message = None if self.content_type == "message/cpim": cpim_message = CPIMPayload.decode(self.body) body = cpim_message.content if isinstance(cpim_message.content, str) else cpim_message.content.decode() content_type = cpim_message.content_type sender = cpim_message.sender or self.from_header disposition = next(([item.strip() for item in header.value.split(',')] for header in cpim_message.additional_headers if header.name == 'Disposition-Notification'), None) message_id = next((header.value for header in cpim_message.additional_headers if header.name == 'Message-ID'), str(uuid.uuid4())) else: body = self.body.decode('utf-8') sender = self.from_header disposition = None message_id = str(uuid.uuid4()) content_type = str(self.content_type) timestamp = str(cpim_message.timestamp) if cpim_message is not None and cpim_message.timestamp is not None else str(ISOTimestamp.now()) sender = sylkrtc.SIPIdentity(uri=str(sender.uri), display_name=sender.display_name) destination = sylkrtc.SIPIdentity(uri=str(self.to_header.uri), display_name=self.to_header.display_name) if content_type == FTHTTPDocument.content_type: document = FTHTTPDocument.parse(body) for info in document: if info.type == 'file': transfer_data = FileTransferData(info.file_name.value, int(info.file_size.value), info.content_type.value, message_id, sender.uri, destination.uri, url=info.data.url) metadata = sylkrtc.TransferredFile(**transfer_data.__dict__) body = json.dumps(sylkrtc.FileTransferMessage(**metadata.__data__).__data__) content_type = 'application/sylk-file-transfer' self.parsed_message = ParsedSIPMessage(body, content_type, sender=sender, disposition=disposition, message_id=message_id, timestamp=timestamp, destination=destination) def _send_public_key(self, from_header, to_header, public_key): if public_key: self.outgoing_message(from_header, public_key, 'text/pgp-public-key', str(to_header)) def _handle_generate_token(self): account = f'{self.from_header.uri.user}@{self.from_header.uri.host}' log.info(f'Adding {account} for storing messages') self.message_storage.add_account(account) token = secrets.token_urlsafe() self.outgoing_message(self.from_header.uri, json.dumps({'token': token, 'url': f'{server.url}/webrtcgateway/messages/history/{account}'}), 'application/sylk-api-token') self.message_storage.add_account_token(account=account, token=token) def _handle_lookup_pgp_key(self): account = f'{self.to_header.uri.user}@{self.to_header.uri.host}' public_key = self.message_storage.get_public_key(account) log.info(f'Public key lookup for {account}') if isinstance(public_key, defer.Deferred): public_key.addCallback(lambda result: self._send_public_key(self.from_header.uri, self.to_header.uri, result)) else: self._send_public_key(self.from_header.uri, self.to_header.uri, public_key) def _handle_message_remove(self): account = f'{self.from_header.uri.user}@{self.from_header.uri.host}' contact = f'{self.to_header.uri.user}@{self.to_header.uri.host}' message_id = self.parsed_message.content - - self.message_storage.removeMessage(account=account, message_id=message_id) - - content = sylkrtc.AccountMessageRemoveEventData(contact=contact, message_id=message_id) - self.message_storage.add(account=account, - contact=contact, - direction='outgoing', - content=json.dumps(content.__data__), - content_type='application/sylk-message-remove', - timestamp=str(ISOTimestamp.now()), - disposition_notification='', - message_id=str(uuid.uuid4())) - - event = sylkrtc.AccountSyncEvent(account=account, type='message', action='remove', content=content) - self.outgoing_message(self.from_header.uri, json.dumps(content.__data__), 'application/sylk-message-remove', str(self.to_header.uri)) notification_center = NotificationCenter() - notification_center.post_notification(name='SIPApplicationGotAccountRemoveMessage', sender=account, data=event) def remove_message_from_receiver(msg_id, messages): for message in messages: if message.message_id == msg_id and message.direction == 'incoming': account = message.account message_id = message.message_id self.message_storage.removeMessage(account=account, message_id=message_id) content = sylkrtc.AccountMessageRemoveEventData(contact=message.contact, message_id=message_id, direction="incoming") self.message_storage.add(account=account, contact=message.contact, direction='incoming', content=json.dumps(content.__data__), content_type='application/sylk-message-remove', timestamp=str(ISOTimestamp.now()), disposition_notification='', message_id=str(uuid.uuid4())) event = sylkrtc.AccountSyncEvent(account=account, type='message', action='remove', content=content) notification_center.post_notification(name='SIPApplicationGotAccountRemoveMessage', sender=account, data=event) log.info("Removed receiver message") break - messages = self.message_storage[[contact, '']] - if isinstance(messages, defer.Deferred): - messages.addCallback(lambda result: remove_message_from_receiver(msg_id=message_id, messages=result)) - else: - remove_message_from_receiver(msg_id=message_id, messages=messages) + def update_other_endpoints(result): + if not result: + return + + content = sylkrtc.AccountMessageRemoveEventData(contact=contact, message_id=message_id) + self.message_storage.add(account=account, + contact=contact, + direction='outgoing', + content=json.dumps(content.__data__), + content_type='application/sylk-message-remove', + timestamp=str(ISOTimestamp.now()), + disposition_notification='', + message_id=str(uuid.uuid4())) + + event = sylkrtc.AccountSyncEvent(account=account, type='message', action='remove', content=content) + self.outgoing_message(self.from_header.uri, json.dumps(content.__data__), 'application/sylk-message-remove', str(self.to_header.uri)) + notification_center.post_notification(name='SIPApplicationGotAccountRemoveMessage', sender=account, data=event) + + messages = self.message_storage[[contact, '']] + if isinstance(messages, defer.Deferred): + messages.addCallback(lambda result: remove_message_from_receiver(msg_id=message_id, messages=result)) + else: + remove_message_from_receiver(msg_id=message_id, messages=messages) + + + removed = defer.maybeDeferred(self.message_storage.removeMessage, account, message_id) + removed.addCallback(lambda result: update_other_endpoints(result)) def _store_message_for_sender(self, account): if account is None: log.info('not storing %s message from non-existent account %s to %s' % (self.parsed_message.content_type, self.from_header.uri, '%s@%s' % (self.to_header.uri.user, self.to_header.uri.host))) return log.debug(f"storage is enabled for originator {account.account}") message = None ignored_content_types = ("application/im-iscomposing+xml", 'text/pgp-public-key', IMDNDocument.content_type) if self.parsed_message.content_type in ignored_content_types: return log.info('storing {content_type} message for account {originator} to {destination.uri}'.format(content_type=self.parsed_message.content_type, originator=account.account, destination=self.parsed_message.destination)) self.message_storage.add(account=account.account, contact=f'{self.to_header.uri.user}@{self.to_header.uri.host}', direction="outgoing", content=self.parsed_message.content, content_type=self.parsed_message.content_type, timestamp=str(self.parsed_message.timestamp), disposition_notification=self.parsed_message.disposition, message_id=self.parsed_message.message_id, state='accepted') message = sylkrtc.AccountSyncEvent(account=account.account, type='message', action='add', content=sylkrtc.AccountMessageRequest( transaction='1', account=account.account, uri=f'{self.to_header.uri.user}@{self.to_header.uri.host}', message_id=self.parsed_message.message_id, content=self.parsed_message.content, content_type=self.parsed_message.content_type, timestamp=str(self.parsed_message.timestamp), server_generated=True )) notification_center = NotificationCenter() notification_center.post_notification(name='SIPApplicationGotOutgoingAccountMessage', sender=account.account, data=message) def _store_message_for_receiver(self, account): if account is None: log.info('not storing %s message from %s to non-existent account %s' % (self.parsed_message.content_type, self.from_header.uri, '%s@%s' % (self.to_header.uri.user, self.to_header.uri.host))) return log.debug(f'processing message from {self.from_header.uri} for account {account.account}') message = None notification_center = NotificationCenter() if self.parsed_message.content_type == "application/im-iscomposing+xml": return if self.parsed_message.content_type == IMDNDocument.content_type: document = IMDNDocument.parse(self.parsed_message.content) imdn_message_id = document.message_id.value imdn_status = document.notification.status.__str__() imdn_datetime = document.datetime.__str__() log.info('storing IMDN message ({status}) from {originator.uri}'.format(status=imdn_status, originator=self.parsed_message.sender)) self.message_storage.update(account=account.account, state=imdn_status, message_id=imdn_message_id) self.message_storage.update(account=str(self.parsed_message.sender.uri), state=imdn_status, message_id=imdn_message_id) message = sylkrtc.AccountDispositionNotificationEvent(account=account.account, state=imdn_status, message_id=imdn_message_id, message_timestamp=imdn_datetime, timestamp=str(self.parsed_message.timestamp), code=200, reason='') imdn_message_event = message.__data__ # del imdn_message_event['account'] ## Maybe prevent multiple imdn rows? self.message_storage.add(account=account.account, contact=self.parsed_message.sender.uri, direction="incoming", content=json.dumps(imdn_message_event), content_type='message/imdn', timestamp=str(self.parsed_message.timestamp), disposition_notification='', message_id=self.parsed_message.message_id, state='received') notification_center.post_notification(name='SIPApplicationGotAccountDispositionNotification', sender=account.account, data=NotificationData(message=message, sender=self.parsed_message.sender)) else: log.info('storing {content_type} message from {originator.uri} for account {account}'.format(content_type=self.parsed_message.content_type, originator=self.parsed_message.sender, account=account.account)) self.message_storage.add(account=account.account, contact=str(self.parsed_message.sender.uri), direction='incoming', content=self.parsed_message.content, content_type=self.parsed_message.content_type, timestamp=str(self.parsed_message.timestamp), disposition_notification=self.parsed_message.disposition, message_id=self.parsed_message.message_id, state='received') message = sylkrtc.AccountMessageEvent(account=account.account, sender=self.parsed_message.sender, content=self.parsed_message.content, content_type=self.parsed_message.content_type, timestamp=str(self.parsed_message.timestamp), disposition_notification=self.parsed_message.disposition, message_id=self.parsed_message.message_id) notification_center.post_notification(name='SIPApplicationGotAccountMessage', sender=account.account, data=message) if self.parsed_message.content_type in ('text/plain', 'text/html', 'application/sylk-file-transfer'): def get_unread_messages(messages, originator): unread = 1 for message in messages: if (message.content_type in ('text/plain', 'text/html', 'application/sylk-file-transfer') and message.direction == 'incoming' and message.contact != account.account and 'display' in message.disposition): unread += 1 # log.info(f'{message.disposition} {message.contact} {message.state}') # log.info(f'there are {unread} messages') push.message(originator=originator, destination=account.account, call_id=str(uuid.uuid4()), badge=unread) messages = self.message_storage[[account.account, '']] if isinstance(messages, defer.Deferred): messages.addCallback(lambda result: get_unread_messages(messages=result, originator=message.sender)) else: get_unread_messages(messages=messages, originator=message.sender) def incoming_message(self, data, content_type=None): self.content_type = content_type if content_type is not None else data.headers.get('Content-Type', Null).content_type self.from_header = data.headers.get('From', Null) self.to_header = data.headers.get('To', Null) self.body = data.body self.from_sip = data.headers.get('X-Sylk-From-Sip', Null) self._parse_message() if self.parsed_message.content_type == 'application/sylk-api-token': self._handle_generate_token() return if self.parsed_message.content_type == 'application/sylk-api-pgp-key-lookup': self._handle_lookup_pgp_key() return if self.parsed_message.content_type == 'application/sylk-api-message-remove': self._handle_message_remove() return if self.from_sip is not Null: log.debug("message is originating from SIP endpoint") sender_account = self.message_storage.get_account(f'{self.from_header.uri.user}@{self.from_header.uri.host}') if isinstance(sender_account, defer.Deferred): sender_account.addCallback(lambda result: self._store_message_for_sender(result)) else: self._store_message_for_sender(sender_account) account = self.message_storage.get_account(f'{self.to_header.uri.user}@{self.to_header.uri.host}') if isinstance(account, defer.Deferred): account.addCallback(lambda result: self._store_message_for_receiver(result)) else: self._store_message_for_receiver(account) @run_in_green_thread def _outgoing_message(self, to_uri, from_uri, content, content_type='text/plain', headers=[], route=None, message_type=Message, subscribe=True): if not route: return from_uri = SIPURI.parse('%s' % from_uri) to_uri = SIPURI.parse('%s' % to_uri) content = content if isinstance(content, bytes) else content.encode() message_request = message_type(FromHeader(from_uri), ToHeader(to_uri), RouteHeader(route.uri), content_type, content, extra_headers=headers) if subscribe: notification_center = NotificationCenter() notification_center.add_observer(self, sender=message_request) message_request.send() @run_in_green_thread def outgoing_message(self, uri, content, content_type='text/plain', identity=None, extra_headers=[]): route = self._lookup_sip_target_route(uri) if route: if identity is None: identity = f'sip:sylkserver@{SIPConfig.local_ip}' log.info("sending message from '%s' to '%s' using proxy %s" % (identity, uri, route)) headers = [Header('X-Sylk-To-Sip', 'yes')] + extra_headers self._outgoing_message(uri, identity, content, content_type, headers=headers, route=route) @run_in_green_thread def outgoing_message_to_self(self, uri, content, content_type='text/plain', identity=None, extra_headers=[]): route = Route(address=SIPConfig.local_ip, port=SIPConfig.local_tcp_port, transport='tcp') if route: if identity is None: identity = f'sip:sylkserver@{SIPConfig.local_ip}' log.debug("sending message from '%s' to '%s' to self %s" % (identity, uri, route)) headers = [Header('X-Sylk-From-Sip', 'yes')] + extra_headers self._outgoing_message(uri, identity, content, content_type, headers=headers, route=route, subscribe=False) @run_in_green_thread def outgoing_replicated_message(self, uri, content, content_type='text/plain', identity=None, extra_headers=[]): route = self._lookup_sip_target_route(identity) if route: if identity is None: identity = f'sip:sylkserver@{SIPConfig.local_ip}' log.info("sending message from '%s' to '%s' using proxy %s" % (identity, uri, route)) headers = [Header('X-Sylk-To-Sip', 'yes'), Header('X-Replicated-Message', 'yes')] + extra_headers self._outgoing_message(uri, identity, content, content_type, headers=headers, route=route, message_type=ReplicatedMessage) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPMessageDidSucceed(self, notification): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=notification.sender) log.info('outgoing message was accepted by remote party') def _NH_SIPMessageDidFail(self, notification): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=notification.sender) data = notification.data reason = data.reason.decode() if isinstance(data.reason, bytes) else data.reason log.warning('could not deliver outgoing message %d %s' % (data.code, reason)) @implementer(IObserver) class FileTransferHandler(object): def __init__(self): self.session = None self.stream = None self.handler = None self.direction = None def init_incoming(self, stream): self.direction = 'incoming' self.stream = stream self.session = stream.session self.handler = stream.handler notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.stream) notification_center.add_observer(self, sender=self.handler) @run_in_green_thread def init_outgoing(self, destination, file): self.direction = 'outgoing' def _terminate(self, failure_reason=None): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.stream) notification_center.remove_observer(self, sender=self.handler) if failure_reason is None: if self.direction == 'incoming' and self.stream.direction == 'recvonly': if not hasattr(self.session, 'transfer_data'): return transfer_data = self.session.transfer_data metadata = sylkrtc.TransferredFile(**transfer_data.__dict__, hash=self.stream.file_selector.hash) meta_filepath = os.path.join(transfer_data.path, f'meta-{metadata.filename}') try: with open(meta_filepath, 'w+') as output_file: output_file.write(json.dumps(metadata.__data__)) except (OSError, IOError): unlink(meta_filepath) log.warning('Could not save metadata %s' % meta_filepath) log.info('File transfer finished, saved to %s' % transfer_data.full_path) message_handler = MessageHandler() cpim_payload = transfer_data.cpim_message_payload(metadata) message_handler.outgoing_message_to_self(f'sip:{metadata.receiver.uri}', cpim_payload, content_type='message/cpim', identity=f'sip:{metadata.sender.uri}') xml_payload = transfer_data.cpim_rcsfthttp_message_payload(metadata) message_handler.outgoing_message(f'sip:{metadata.receiver.uri}', xml_payload, content_type='message/cpim', identity=f'sip:{metadata.sender.uri}') message_handler.outgoing_replicated_message(f'sip:{metadata.receiver.uri}', xml_payload, content_type='message/cpim', identity=f'sip:{metadata.sender.uri}') if not metadata.filename.endswith('.asc'): payload = transfer_data.message_payload message_handler.outgoing_replicated_message(f'sip:{metadata.receiver.uri}', payload, identity=f'sip:{metadata.sender.uri}') message_handler.outgoing_message(f'sip:{metadata.receiver.uri}', payload, identity=f'sip:{metadata.sender.uri}') else: pass self.session = None self.stream = None self.handler = None @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_MediaStreamDidNotInitialize(self, notification): self._terminate(failure_reason=notification.data.reason) def _NH_FileTransferHandlerDidEnd(self, notification): if self.direction == 'incoming': if self.stream.direction == 'sendonly': reactor.callLater(3, self.session.end) else: reactor.callLater(1, self.session.end) else: self.session.end() self._terminate(failure_reason=notification.data.reason) diff --git a/sylk/applications/webrtcgateway/storage.py b/sylk/applications/webrtcgateway/storage.py index 3df47d4..cdd06ed 100644 --- a/sylk/applications/webrtcgateway/storage.py +++ b/sylk/applications/webrtcgateway/storage.py @@ -1,751 +1,766 @@ import datetime import json import pickle as pickle import os from application.python.types import Singleton from application.system import makedirs from collections import defaultdict from sipsimple.threading import run_in_thread from sipsimple.util import ISOTimestamp from shutil import rmtree from twisted.internet import defer from types import SimpleNamespace from sylk.configuration import ServerConfig from .configuration import CassandraConfig from .datatypes import FileTransferData from .errors import StorageError from .logger import log __all__ = 'TokenStorage', # TODO: Maybe add some more metadata like the modification date so we know when a token was refreshed, # and thus it's ok to scrap it after a reasonable amount of time. CASSANDRA_MODULES_AVAILABLE = False try: from cassandra.cqlengine import columns, connection except ImportError: pass else: try: from cassandra.cqlengine.models import Model except ImportError: pass else: CASSANDRA_MODULES_AVAILABLE = True from cassandra import InvalidRequest from cassandra.cqlengine import CQLEngineException from cassandra.cqlengine.query import LWTException from cassandra.cluster import NoHostAvailable from cassandra.policies import DCAwareRoundRobinPolicy from .models.storage.cassandra import PushTokens from .models.storage.cassandra import ChatAccount, ChatMessage, ChatMessageIdMapping, PublicKey if CassandraConfig.push_tokens_table: PushTokens.__table_name__ = CassandraConfig.push_tokens_table class FileTokenStorage(object): def __init__(self): self._tokens = defaultdict() @run_in_thread('file-io') def _save(self): with open(os.path.join(ServerConfig.spool_dir, 'webrtc_device_tokens'), 'wb+') as f: pickle.dump(self._tokens, f) @run_in_thread('file-io') def load(self): try: tokens = pickle.load(open(os.path.join(ServerConfig.spool_dir, 'webrtc_device_tokens'), 'rb')) except Exception: pass else: self._tokens.update(tokens) def __getitem__(self, key): try: return self._tokens[key] except KeyError: return {} def add(self, account, contact_params, user_agent): try: (token, background_token) = contact_params['pn_tok'].split('-') except ValueError: token = contact_params['pn_tok'] background_token = None data = { 'device_id': contact_params['pn_device'], 'platform': contact_params['pn_type'], 'silent': contact_params['pn_silent'], 'app_id': contact_params['pn_app'], 'user_agent': user_agent, 'background_token': background_token, 'device_token': token } key = f"{data['app_id']}-{data['device_id']}" if account in self._tokens: if isinstance(self._tokens[account], set): self._tokens[account] = {} # Remove old storage layout based on device id if contact_params['pn_device'] in self._tokens[account]: del self._tokens[account][contact_params['pn_device']] # Remove old storage layout based on token if token in self._tokens[account]: del self._tokens[account][token] # Remove old unsplit token if exists, can be removed if all tokens are stored split if background_token is not None: try: del self._tokens[account][contact_params['pn_tok']] except (IndexError, KeyError): pass self._tokens[account][key] = data else: self._tokens[account] = {key: data} self._save() def remove(self, account, app_id, device_id): key = f'{app_id}-{device_id}' try: del self._tokens[account][key] except KeyError: pass self._save() def removeAll(self, account): try: del self._tokens[account] except KeyError: pass self._save() class CassandraConnection(object, metaclass=Singleton): @run_in_thread('cassandra') def __init__(self): try: from cassandra.io import twistedreactor self.session = connection.setup(CassandraConfig.cluster_contact_points, CassandraConfig.keyspace, load_balancing_policy=DCAwareRoundRobinPolicy(), protocol_version=4, connection_class=twistedreactor.TwistedConnection) except NoHostAvailable: log.error("Not able to connect to any of the Cassandra contact points") raise StorageError class CassandraTokenStorage(object): @run_in_thread('cassandra') def load(self): CassandraConnection() def __getitem__(self, key): deferred = defer.Deferred() @run_in_thread('cassandra') def query_tokens(key): username, domain = key.split('@', 1) tokens = {} for device in PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain): tokens[f'{device.app_id}-{device.device_id}'] = {'device_id': device.device_id, 'token': device.device_token, 'platform': device.platform, 'silent': device.silent, 'app_id': device.app_id, 'background_token': device.background_token} deferred.callback(tokens) return tokens query_tokens(key) return deferred @run_in_thread('cassandra') def add(self, account, contact_params, user_agent): username, domain = account.split('@', 1) token = contact_params['pn_tok'] background_token = None if contact_params['pn_type'] == 'ios': try: (token, background_token) = contact_params['pn_tok'].split('-') except ValueError: pass # Remove old unsplit token if exists, can be removed if all tokens are stored split if background_token is not None: try: old_token = PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain)[0] except (IndexError, LWTException): pass else: if old_token.device_token == contact_params['pn_tok']: old_token.delete() try: PushTokens.create(username=username, domain=domain, device_id=contact_params['pn_device'], device_token=token, background_token=background_token, platform=contact_params['pn_type'], silent=contact_params['pn_silent'], app_id=contact_params['pn_app'], user_agent=user_agent) except (CQLEngineException, InvalidRequest) as e: log.error(f'Storing token failed: {e}') raise StorageError @run_in_thread('cassandra') def remove(self, account, app_id, device_id): username, domain = account.split('@', 1) try: PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain, PushTokens.device_id == device_id, PushTokens.app_id == app_id).if_exists().delete() except LWTException: pass class FileMessageStorage(object): def __init__(self): self._public_keys = defaultdict() self._accounts = defaultdict() self._storage_path = os.path.join(ServerConfig.spool_dir, 'conversations') def _json_dateconverter(self, o): if isinstance(o, datetime.datetime): return o.__str__() @run_in_thread('file-io') def _save(self): with open(os.path.join(self._storage_path, 'accounts.json'), 'w+') as f: json.dump(self._accounts, f, default=self._json_dateconverter) with open(os.path.join(self._storage_path, 'public_keys.json'), 'w+') as f: json.dump(self._public_keys, f, default=self._json_dateconverter) def _save_messages(self, account, messages): with open(os.path.join(self._storage_path, account[0], f'{account}_messages.json'), 'w+') as f: json.dump(messages, f, default=self._json_dateconverter, indent=4) def _save_id_by_timestamp(self, account, ids): with open(os.path.join(self._storage_path, account[0], f'{account}_id_timestamp.json'), 'w+') as f: json.dump(ids, f, default=self._json_dateconverter) @run_in_thread('file-io') def load(self): makedirs(self._storage_path) try: accounts = json.load(open(os.path.join(self._storage_path, 'accounts.json'), 'r')) except (OSError, IOError): pass else: self._accounts.update(accounts) try: public_keys = json.load(open(os.path.join(self._storage_path, 'public_keys.json'), 'r')) except (OSError, IOError): pass else: self._public_keys.update(public_keys) def _load_id_by_timestamp(self, account): try: with open(os.path.join(self._storage_path, account[0], f'{account}_id_timestamp.json'), 'r') as f: data = json.load(f) return data except (OSError, IOError) as e: raise e def _load_messages(self, account): try: with open(os.path.join(self._storage_path, account[0], f'{account}_messages.json'), 'r') as f: messages = json.load(f) return messages except (OSError, IOError) as e: raise e def __getitem__(self, key): deferred = defer.Deferred() @run_in_thread('file-io') def query(account, message_id, since): messages = [] timestamp = None try: id_by_timestamp = self._load_id_by_timestamp(account) except (OSError, IOError): deferred.callback(messages) return if message_id is not None: try: timestamp = id_by_timestamp[message_id] except KeyError: deferred.callback(messages) return else: timestamp = ISOTimestamp(timestamp) if since and not message_id: timestamp = ISOTimestamp(since) try: messages = self._load_messages(account) except (OSError, IOError): deferred.callback(messages) return else: if timestamp is not None: messages = [message for message in messages if ISOTimestamp(message['created_at']) > timestamp] deferred.callback(messages) else: deferred.callback(messages) try: since = key[2] except IndexError: since = None query(key[0], key[1], since) return deferred def get_account(self, account): try: return SimpleNamespace(account=account, **self._accounts[account]) except KeyError: return None def get_account_token(self, account): try: if datetime.datetime.now() < datetime.datetime.fromisoformat(str(self._accounts[account]['token_expire'])): return self._accounts[account]['api_token'] return None except KeyError: return None def remove_account_token(self, account): if account in self._accounts: self._accounts[account]['api_token'] = '' self._save() def add_account(self, account): timestamp = datetime.datetime.now() if account not in self._accounts: self._accounts[account] = {'last_login': timestamp} self._save() else: self._accounts[account]['last_login'] = timestamp self._save() def get_public_key(self, account): try: return self._public_keys[account]['content'] except KeyError: return None def remove_account(self, account): if account in self._accounts: del self._accounts[account] self._save() def remove_public_key(self, account): if account in self._public_keys: del self.public_keys[account] self._save() def add_account_token(self, account, token): timestamp = datetime.datetime.now() if account not in self._accounts: log.error(f'Updating API token for {account} failed') return StorageError self._accounts[account]['api_token'] = token self._accounts[account]['token_expire'] = timestamp + datetime.timedelta(seconds=26784000) self._save() @run_in_thread('file-io') def mark_conversation_read(self, account, contact): try: messages = self._load_messages(account) except (OSError, IOError): return else: for idx, message in enumerate(messages): if message['contact'] == contact: message['disposition'] = [] messages[idx] = message self._save_messages(account, messages) @run_in_thread('file-io') def update(self, account, state, message_id): try: messages = self._load_messages(account) except (OSError, IOError): return try: id_by_timestamp = self._load_id_by_timestamp(account) except (OSError, IOError): return else: try: timestamp = id_by_timestamp[message_id] except KeyError: return else: for idx, message in enumerate(messages): if message['created_at'] == timestamp and message['message_id'] == message_id: if message['state'] != 'received': message['state'] = state if state == 'delivered': try: message['disposition'].remove('positive-delivery') except ValueError: pass elif state == 'displayed': message['disposition'] = [] messages[idx] = message self._save_messages(account, messages) break @run_in_thread('file-io') def add(self, account, contact, direction, content, content_type, timestamp, disposition_notification, message_id, state=None): try: msg_timestamp = datetime.datetime.fromisoformat(timestamp) except ValueError: msg_timestamp = datetime.datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%f%z") timestamp = datetime.datetime.now() messages = [] id_by_timestamp = {} try: messages = self._load_messages(account) except (OSError, IOError): makedirs(os.path.join(self._storage_path, account[0])) timestamp_not_found = True try: id_by_timestamp = self._load_id_by_timestamp(account) except (OSError, IOError): pass else: try: created_at = id_by_timestamp[message_id] except KeyError: pass else: timestamp_not_found = False if content_type == 'message/imdn+json': return items = [n for n in messages if n['created_at'] == created_at and n['message_id'] == message_id] if len(items) == 1: return if content_type == 'text/pgp-public-key': key = account if direction == "outgoing" else contact self._public_keys[key] = {'content': content, 'created_at': timestamp} self._save() return if content_type == 'text/pgp-private-key': if timestamp_not_found: id_by_timestamp[message_id] = timestamp self._save_id_by_timestamp(account, id_by_timestamp) return if not isinstance(disposition_notification, list) and disposition_notification == '': disposition_notification = [] message = {'account': account, 'direction': direction, 'contact': contact, 'content_type': content_type, 'content': content, 'created_at': timestamp, 'message_id': message_id, 'disposition': disposition_notification, 'state': state, 'msg_timestamp': msg_timestamp} messages.append(message) self._save_messages(account, messages) if timestamp_not_found: id_by_timestamp[message_id] = timestamp self._save_id_by_timestamp(account, id_by_timestamp) @run_in_thread('file-io') def removeChat(self, account, contact): try: messages = self._load_messages(account) except (OSError, IOError): pass else: messages[:] = [message for message in messages if message['contact'] != contact] self._save_messages(account, messages) @run_in_thread('file-io') def removeMessage(self, account, message_id): try: id_by_timestamp = self._load_id_by_timestamp(account) except (OSError, IOError): - return + return False else: try: timestamp = id_by_timestamp[message_id] except KeyError: - return + return False else: try: messages = self._load_messages(account) except (OSError, IOError): return else: item = [n for n in messages if n['created_at'] == timestamp and n['message_id'] == message_id] if len(item) == 1: if item[0]['content_type'] == 'application/sylk-file-transfer': data = FileTransferData('', '', '', item[0]['message_id'], item[0]['account'], item[0]['contact']) try: rmtree(data.path) except FileNotFoundError: pass else: print(f"Removed {data.path}") messages.remove(item[0]) self._save_messages(account, messages) + return True + return False class CassandraMessageStorage(object): @run_in_thread('cassandra') def load(self): CassandraConnection() def __getitem__(self, key): deferred = defer.Deferred() @run_in_thread('cassandra') def query_messages(key, message_id, since): messages = [] try: timestamp = ChatMessageIdMapping.objects(ChatMessageIdMapping.message_id == message_id)[0] except (IndexError, InvalidRequest): timestamp = datetime.datetime.now() - datetime.timedelta(days=3) else: timestamp = timestamp.created_at if since and not message_id: timestamp = ISOTimestamp(since) for message in ChatMessage.objects(ChatMessage.account == key, ChatMessage.created_at > timestamp): timestamp_naive = message.msg_timestamp try: timestamp_utc = timestamp_naive.replace(tzinfo=datetime.timezone.utc) except AttributeError: continue message.msg_timestamp = timestamp_utc messages.append(message) deferred.callback(messages) try: since = key[2] except IndexError: since = None query_messages(key[0], key[1], since) return deferred def get_account(self, account): deferred = defer.Deferred() @run_in_thread('cassandra') def query_tokens(account): try: chat_account = ChatAccount.objects(ChatAccount.account == account)[0] except (IndexError, InvalidRequest): deferred.callback(None) else: deferred.callback(chat_account) query_tokens(account) return deferred def get_account_token(self, account): deferred = defer.Deferred() @run_in_thread('cassandra') def query_tokens(account): try: chat_account = ChatAccount.objects(ChatAccount.account == account)[0] except (IndexError, InvalidRequest): deferred.callback(None) else: deferred.callback(chat_account.api_token) query_tokens(account) return deferred @run_in_thread('cassandra') def add_account(self, account): timestamp = datetime.datetime.now() try: ChatAccount.create(account=account, last_login=timestamp) except (CQLEngineException, InvalidRequest) as e: log.error(f'Storing account failed: {e}') raise StorageError @run_in_thread('cassandra') def add_account_token(self, account, token): try: chat_account = ChatAccount.objects(account=account)[0] chat_account.ttl(2678400).update(api_token=token) except IndexError: log.error(f'Updating API token for {account} failed') raise StorageError @run_in_thread('cassandra') def mark_conversation_read(self, account, contact): for message in ChatMessage.objects(ChatMessage.account == account): if message.contact == contact: if message.content_type == 'application/sylk-conversation-read': message.delete() else: message.disposition.clear() message.save() def get_public_key(self, account): deferred = defer.Deferred() @run_in_thread('cassandra') def query_key(account): try: public_key = PublicKey.objects(PublicKey.account == account)[0] except (IndexError, InvalidRequest): deferred.callback(None) else: deferred.callback(public_key.public_key) query_key(account) return deferred @run_in_thread('cassandra') def update(self, account, state, message_id): try: timestamp = ChatMessageIdMapping.objects(ChatMessageIdMapping.message_id == message_id)[0] except IndexError: return else: try: message = ChatMessage.objects(ChatMessage.account == account, ChatMessage.created_at == timestamp.created_at, ChatMessage.message_id == message_id)[0] except IndexError: pass else: if message.state != 'received': message.state = state if state == 'delivered': try: message.disposition.remove('positive-delivery') except ValueError: pass elif state == 'displayed': message.disposition.clear() message.save() @run_in_thread('cassandra') def add(self, account, contact, direction, content, content_type, timestamp, disposition_notification, message_id, state=None): try: msg_timestamp = datetime.datetime.fromisoformat(timestamp) except ValueError: msg_timestamp = datetime.datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%f%z") timestamp = datetime.datetime.now() timestamp_not_found = True try: created_at = ChatMessageIdMapping.objects(ChatMessageIdMapping.message_id == message_id)[0] except IndexError: pass else: timestamp_not_found = False timestamp = created_at.created_at if content_type == 'message/imdn+json': return if ChatMessage.objects(ChatMessage.account == account, ChatMessage.created_at == created_at.created_at, ChatMessage.message_id == message_id).count() != 0: return if content_type == 'text/pgp-public-key': try: PublicKey.create(account=account if direction == "outgoing" else contact, public_key=content, created_at=timestamp) ChatMessageIdMapping.create(created_at=timestamp, message_id=message_id) except (CQLEngineException, InvalidRequest) as e: log.error(f'Storing public key failed: {e}') raise StorageError else: return if content_type == 'text/pgp-private-key': if timestamp_not_found: try: ChatMessageIdMapping.create(created_at=timestamp, message_id=message_id) except (CQLEngineException, InvalidRequest): pass return try: ChatMessage.create(account=account, direction=direction, contact=contact, content_type=content_type, content=content, created_at=timestamp, message_id=message_id, disposition=disposition_notification, state=state, msg_timestamp=msg_timestamp) if timestamp_not_found: ChatMessageIdMapping.create(created_at=timestamp, message_id=message_id) except (CQLEngineException, InvalidRequest) as e: log.error(f'Storing message failed: {e}') raise StorageError @run_in_thread('cassandra') def removeChat(self, account, contact): try: messages = ChatMessage.objects(ChatMessage.account == account) except LWTException: pass else: for message in messages: if message.contact == contact: message.delete() - @run_in_thread('cassandra') def removeMessage(self, account, message_id): - try: - timestamp = ChatMessageIdMapping.objects(ChatMessageIdMapping.message_id == message_id)[0] - except IndexError: - return - else: + deferred = defer.Deferred() + + @run_in_thread('cassandra') + def remove(account, message_id): try: - messages = ChatMessage.objects(ChatMessage.account == account, - ChatMessage.created_at == timestamp.created_at, - # ChatMessage.message_id == message_id).if_exists().delete() - ChatMessage.message_id == message_id) - - for message in messages: - if message.content_type == 'application/sylk-file-transfer': - data = FileTransferData('', '', '', message.message_id, message.account, message.contact) - try: - rmtree(data.path) - except FileNotFoundError: - pass - else: - print(f"Removed {data.path}") - - messages.if_exists().delete() - except LWTException: - pass + timestamp = ChatMessageIdMapping.objects(ChatMessageIdMapping.message_id == message_id)[0] + except IndexError: + deferred.callback(False) + return + else: + try: + result = True + messages = ChatMessage.objects(ChatMessage.account == account, + ChatMessage.created_at == timestamp.created_at, + # ChatMessage.message_id == message_id).if_exists().delete() + ChatMessage.message_id == message_id) + + for message in messages: + if message.content_type == 'application/sylk-file-transfer': + data = FileTransferData('', '', '', message.message_id, message.account, message.contact) + try: + rmtree(data.path) + except FileNotFoundError: + pass + else: + print(f"Removed {data.path}") + if not messages: + result = False + print('message not removed') + else: + print('message removed') + + messages.if_exists().delete() + deferred.callback(result) + except LWTException: + pass + remove(account, message_id) + return deferred class TokenStorage(object, metaclass=Singleton): def __new__(self): if CASSANDRA_MODULES_AVAILABLE and CassandraConfig.cluster_contact_points: return CassandraTokenStorage() else: return FileTokenStorage() class MessageStorage(object, metaclass=Singleton): def __new__(self): if CASSANDRA_MODULES_AVAILABLE and CassandraConfig.cluster_contact_points: return CassandraMessageStorage() else: return FileMessageStorage()