diff --git a/sylk/applications/webrtcgateway/push.py b/sylk/applications/webrtcgateway/push.py index 57a27ed..0a20907 100644 --- a/sylk/applications/webrtcgateway/push.py +++ b/sylk/applications/webrtcgateway/push.py @@ -1,130 +1,130 @@ import json from twisted.internet import defer, reactor from twisted.web.client import Agent, readBody from twisted.web.iweb import IBodyProducer from twisted.web.http_headers import Headers from zope.interface import implementer from .configuration import GeneralConfig from .logger import log from .models import sylkpush from .storage import TokenStorage __all__ = 'conference_invite' agent = Agent(reactor) headers = Headers({'User-Agent': ['SylkServer'], 'Content-Type': ['application/json']}) @implementer(IBodyProducer) class BytesProducer(object): def __init__(self, data): self.body = data self.length = len(data) def startProducing(self, consumer): consumer.write(self.body) return defer.succeed(None) def pauseProducing(self): pass def stopProducing(self): pass def _construct_and_send(result, request, destination): - for device_token, push_parameters in result.items(): - request.token = device_token - request.app_id = push_parameters['app'] + for device, push_parameters in result.items(): + request.token = push_parameters['token'] + request.app_id = push_parameters['app_id'] request.platform = push_parameters['platform'] request.device_id = push_parameters['device_id'] - _send_push_notification(request, destination, device_token) + _send_push_notification(request, destination, request.token) def conference_invite(originator, destination, room, call_id, audio, video): tokens = TokenStorage() if video: media_type = 'video' else: media_type = 'audio' request = sylkpush.ConferenceInviteEvent(token='dummy', app_id='dummy', platform='dummy', device_id='dummy', originator=originator.uri, from_display_name=originator.display_name, to=room, call_id=str(call_id), media_type=media_type) user_tokens = tokens[destination] if isinstance(user_tokens, set): return else: if isinstance(user_tokens, defer.Deferred): user_tokens.addCallback(lambda result: _construct_and_send(result, request, destination)) else: _construct_and_send(user_tokens, request, destination) @defer.inlineCallbacks def _send_push_notification(payload, destination, token): if GeneralConfig.sylk_push_url: try: r = yield agent.request(b'POST', GeneralConfig.sylk_push_url.encode(), headers, BytesProducer(json.dumps(payload.__data__).encode()) ) except Exception as e: log.info('Error sending push notification to %s: %s', GeneralConfig.sylk_push_url, e) else: try: raw_body = yield readBody(r) except Exception as e: log.warning('Error reading response body: %s', e) else: try: body = json.loads(raw_body) except Exception as e: log.warning('Error parsing response body: %s', e) body = {} try: platform = body['data']['platform'] except KeyError: platform = 'Unknown platform' if r.code != 200: try: reason = body['data']['reason'] except KeyError: reason = None try: details = body['data']['body']['_content']['error']['message'] except KeyError: details = None if reason and details: error_description = "%s %s" % (reason, details) elif reason: error_description = reason else: error_description = body if r.code == 410: if body and 'application/json' in r.headers.getRawHeaders('content-type'): try: token = body['data']['token'] except KeyError: pass else: log.info('Purging expired push token %s/%s' % (destination, token)) tokens = TokenStorage() tokens.remove(destination, payload.app_id, payload.device_id) else: log.warning('Error sending %s push notification for videoroom to %s/%s: %s (%s) %s %s' % (platform.title(), payload.to, destination, token[:15], r.phrase.decode(), r.code, error_description)) else: log.info('Sent %s push notify for videoroom %s to %s/%s' % (platform.title(), payload.to, destination, token[:15])) else: log.warning('Cannot send push notification: no Sylk push server configured') diff --git a/sylk/applications/webrtcgateway/storage.py b/sylk/applications/webrtcgateway/storage.py index 50f1ed5..0ad1d98 100644 --- a/sylk/applications/webrtcgateway/storage.py +++ b/sylk/applications/webrtcgateway/storage.py @@ -1,665 +1,671 @@ import datetime import json import pickle as pickle import os from application.python.types import Singleton from application.system import makedirs from collections import defaultdict from sipsimple.threading import run_in_thread from sipsimple.util import ISOTimestamp from twisted.internet import defer from types import SimpleNamespace from sylk.configuration import ServerConfig from .configuration import CassandraConfig from .errors import StorageError from .logger import log __all__ = 'TokenStorage', # TODO: Maybe add some more metadata like the modification date so we know when a token was refreshed, # and thus it's ok to scrap it after a reasonable amount of time. CASSANDRA_MODULES_AVAILABLE = False try: from cassandra.cqlengine import columns, connection except ImportError: pass else: try: from cassandra.cqlengine.models import Model except ImportError: pass else: CASSANDRA_MODULES_AVAILABLE = True from cassandra import InvalidRequest from cassandra.cqlengine import CQLEngineException from cassandra.cqlengine.query import LWTException from cassandra.cluster import NoHostAvailable from cassandra.policies import DCAwareRoundRobinPolicy from .models.storage.cassandra import PushTokens from .models.storage.cassandra import ChatAccount, ChatMessage, ChatMessageIdMapping, PublicKey if CassandraConfig.push_tokens_table: PushTokens.__table_name__ = CassandraConfig.push_tokens_table class FileTokenStorage(object): def __init__(self): self._tokens = defaultdict() @run_in_thread('file-io') def _save(self): with open(os.path.join(ServerConfig.spool_dir, 'webrtc_device_tokens'), 'wb+') as f: pickle.dump(self._tokens, f) @run_in_thread('file-io') def load(self): try: tokens = pickle.load(open(os.path.join(ServerConfig.spool_dir, 'webrtc_device_tokens'), 'rb')) except Exception: pass else: self._tokens.update(tokens) def __getitem__(self, key): try: return self._tokens[key] except KeyError: return {} def add(self, account, contact_params, user_agent): try: (token, background_token) = contact_params['pn_tok'].split('-') except ValueError: token = contact_params['pn_tok'] background_token = None data = { 'device_id': contact_params['pn_device'], 'platform': contact_params['pn_type'], 'silent': contact_params['pn_silent'], 'app_id': contact_params['pn_app'], 'user_agent': user_agent, 'background_token': background_token } key = f"{data['app_id']}-{data['device_id']}" if account in self._tokens: if isinstance(self._tokens[account], set): self._tokens[account] = {} # Remove old storage layout based on device id if contact_params['pn_device'] in self._tokens[account]: del self._tokens[account][contact_params['pn_device']] # Remove old storage layout based on token if token in self._tokens[account]: del self._tokens[account][token] # Remove old unsplit token if exists, can be removed if all tokens are stored split if background_token is not None: try: del self._tokens[account][contact_params['pn_tok']] except IndexError: pass self._tokens[account][key] = data else: self._tokens[account] = {key: data} self._save() def remove(self, account, app_id, device_id): key = f'{app_id}-{device_id}' try: del self._tokens[account][key] except KeyError: pass self._save() class CassandraConnection(object, metaclass=Singleton): @run_in_thread('cassandra') def __init__(self): try: self.session = connection.setup(CassandraConfig.cluster_contact_points, CassandraConfig.keyspace, load_balancing_policy=DCAwareRoundRobinPolicy(), protocol_version=4) except NoHostAvailable: log.error("Not able to connect to any of the Cassandra contact points") raise StorageError class CassandraTokenStorage(object): @run_in_thread('cassandra') def load(self): CassandraConnection() def __getitem__(self, key): deferred = defer.Deferred() @run_in_thread('cassandra') def query_tokens(key): username, domain = key.split('@', 1) tokens = {} for device in PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain): - tokens[f'{device.app_id}-{device.device_id}'] = {'device_id': device.device_id, 'token': device.token, + tokens[f'{device.app_id}-{device.device_id}'] = {'device_id': device.device_id, 'token': device.device_token, 'platform': device.platform, 'silent': device.silent, 'app_id': device.app_id, 'background_token': device.background_token} deferred.callback(tokens) return tokens query_tokens(key) return deferred @run_in_thread('cassandra') def add(self, account, contact_params, user_agent): username, domain = account.split('@', 1) - try: - (token, background_token) = contact_params['pn_tok'].split('-') - except ValueError: - token = contact_params['pn_tok'] - background_token = None + + token = contact_params['pn_tok'] + background_token = None + if contact_params['pn_type'] == 'ios': + try: + (token, background_token) = contact_params['pn_tok'].split('-') + except ValueError: + pass # Remove old unsplit token if exists, can be removed if all tokens are stored split if background_token is not None: try: - PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain, PushTokens.device_token == contact_params['pn_tok']).if_exists().delete() - except LWTException: + old_token = PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain)[0] + except (IndexError, LWTException): pass + else: + if old_token.device_token == contact_params['pn_tok']: + old_token.delete() try: PushTokens.create(username=username, domain=domain, device_id=contact_params['pn_device'], device_token=token, background_token=background_token, platform=contact_params['pn_type'], silent=contact_params['pn_silent'], app_id=contact_params['pn_app'], user_agent=user_agent) except (CQLEngineException, InvalidRequest) as e: log.error(f'Storing token failed: {e}') raise StorageError @run_in_thread('cassandra') def remove(self, account, app_id, device_id): username, domain = account.split('@', 1) try: PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain, PushTokens.device_id == device_id, PushTokens.app_id == app_id).if_exists().delete() except LWTException: pass class FileMessageStorage(object): def __init__(self): self._public_keys = defaultdict() self._accounts = defaultdict() self._storage_path = os.path.join(ServerConfig.spool_dir, 'conversations') def _json_dateconverter(self, o): if isinstance(o, datetime.datetime): return o.__str__() @run_in_thread('file-io') def _save(self): with open(os.path.join(self._storage_path, 'accounts.json'), 'w+') as f: json.dump(self._accounts, f, default=self._json_dateconverter) with open(os.path.join(self._storage_path, 'public_keys.json'), 'w+') as f: json.dump(self._public_keys, f, default=self._json_dateconverter) def _save_messages(self, account, messages): with open(os.path.join(self._storage_path, account[0], f'{account}_messages.json'), 'w+') as f: json.dump(messages, f, default=self._json_dateconverter, indent=4) def _save_id_by_timestamp(self, account, ids): with open(os.path.join(self._storage_path, account[0], f'{account}_id_timestamp.json'), 'w+') as f: json.dump(ids, f, default=self._json_dateconverter) @run_in_thread('file-io') def load(self): makedirs(self._storage_path) try: accounts = json.load(open(os.path.join(self._storage_path, 'accounts.json'), 'r')) except (OSError, IOError): pass else: self._accounts.update(accounts) def _load_id_by_timestamp(self, account): try: with open(os.path.join(self._storage_path, account[0], f'{account}_id_timestamp.json'), 'r') as f: data = json.load(f) return data except (OSError, IOError) as e: raise e def _load_messages(self, account): try: with open(os.path.join(self._storage_path, account[0], f'{account}_messages.json'), 'r') as f: messages = json.load(f) return messages except (OSError, IOError) as e: raise e def __getitem__(self, key): deferred = defer.Deferred() @run_in_thread('file-io') def query(account, message_id): messages = [] timestamp = None try: id_by_timestamp = self._load_id_by_timestamp(account) except (OSError, IOError): deferred.callback(messages) return if message_id is not None: try: timestamp = id_by_timestamp[message_id] except KeyError: deferred.callback(messages) return else: timestamp = ISOTimestamp(timestamp) try: messages = self._load_messages(account) except (OSError, IOError): deferred.callback(messages) return else: if timestamp is not None: messages = [message for message in messages if ISOTimestamp(message['created_at']) > timestamp] deferred.callback(messages) else: deferred.callback(messages) query(key[0], key[1]) return deferred def get_account(self, account): try: return SimpleNamespace(account=account, **self._accounts[account]) except KeyError: return None def get_account_token(self, account): try: if datetime.datetime.now() < datetime.datetime.fromisoformat(self._accounts[account]['token_expire']): return self._accounts[account]['api_token'] return None except KeyError: return None def add_account(self, account): timestamp = datetime.datetime.now() if account not in self._accounts: self._accounts[account] = {'last_login': timestamp} self._save() def add_account_token(self, account, token): timestamp = datetime.datetime.now() if account not in self._accounts: log.error(f'Updating API token for {account} failed') return StorageError self._accounts[account]['api_token'] = token self._accounts[account]['token_expire'] = timestamp + datetime.timedelta(seconds=26784000) self._save() @run_in_thread('file-io') def mark_conversation_read(self, account, contact): try: messages = self._load_messages(account) except (OSError, IOError): return else: for idx, message in enumerate(messages): if message['contact'] == contact: message['disposition'] = [] messages[idx] = message self._save_messages(account, messages) @run_in_thread('file-io') def update(self, account, state, message_id): try: messages = self._load_messages(account) except (OSError, IOError): return try: id_by_timestamp = self._load_id_by_timestamp(account) except (OSError, IOError): return else: try: timestamp = id_by_timestamp[message_id] except KeyError: return else: for idx, message in enumerate(messages): if message['created_at'] == timestamp and message['message_id'] == message_id: if message['state'] != 'received': message['state'] = state if state == 'delivered': try: message['disposition'].remove('positive-delivery') except ValueError: pass elif state == 'displayed': message['disposition'] = [] messages[idx] = message self._save_messages(account, messages) break @run_in_thread('file-io') def add(self, account, contact, direction, content, content_type, timestamp, disposition_notification, message_id, state=None): try: msg_timestamp = datetime.datetime.fromisoformat(timestamp) except ValueError: msg_timestamp = datetime.datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%f%z") timestamp = datetime.datetime.now() messages = [] id_by_timestamp = {} try: messages = self._load_messages(account) except (OSError, IOError): makedirs(os.path.join(self._storage_path, account[0])) timestamp_not_found = True try: id_by_timestamp = self._load_id_by_timestamp(account) except (OSError, IOError): pass else: try: created_at = id_by_timestamp[message_id] except KeyError: pass else: timestamp_not_found = False if content_type == 'message/imdn+json': return items = [n for n in messages if n['created_at'] == created_at and n['message_id'] == message_id] if len(items) == 1: return if content_type == 'text/pgp-public-key': key = account if direction == "outgoing" else contact self._public_keys[key] = {'content': content, 'created_at': timestamp} self._save() return if content_type == 'text/pgp-private-key': if timestamp_not_found: id_by_timestamp[message_id] = timestamp self._save_id_by_timestamp(account, id_by_timestamp) return if not isinstance(disposition_notification, list) and disposition_notification == '': disposition_notification = [] message = {'account': account, 'direction': direction, 'contact': contact, 'content_type': content_type, 'content': content, 'created_at': timestamp, 'message_id': message_id, 'disposition': disposition_notification, 'state': state, 'msg_timestamp': msg_timestamp} messages.append(message) self._save_messages(account, messages) if timestamp_not_found: id_by_timestamp[message_id] = timestamp self._save_id_by_timestamp(account, id_by_timestamp) @run_in_thread('file-io') def removeChat(self, account, contact): try: messages = self._load_messages(account) except (OSError, IOError): pass else: for message in messages: if message['contact'] == contact: messages.remove(message) self._save_messages(account, messages) @run_in_thread('file-io') def removeMessage(self, account, message_id): try: id_by_timestamp = self._load_id_by_timestamp(account) except (OSError, IOError): return else: try: timestamp = id_by_timestamp[message_id] except KeyError: return else: try: messages = self._load_messages(account) except (OSError, IOError): return else: item = [n for n in messages if n['created_at'] == timestamp and n['message_id'] == message_id] if len(item) == 1: messages.remove(item[0]) self._save_messages(account, messages) class CassandraMessageStorage(object): @run_in_thread('cassandra') def load(self): CassandraConnection() def __getitem__(self, key): deferred = defer.Deferred() @run_in_thread('cassandra') def query_messages(key, message_id): messages = [] try: timestamp = ChatMessageIdMapping.objects(ChatMessageIdMapping.message_id == message_id)[0] except (IndexError, InvalidRequest): timestamp = datetime.datetime.now() - datetime.timedelta(days=3) else: timestamp = timestamp.created_at for message in ChatMessage.objects(ChatMessage.account == key, ChatMessage.created_at > timestamp): messages.append(message) deferred.callback(messages) query_messages(key[0], key[1]) return deferred def get_account(self, account): deferred = defer.Deferred() @run_in_thread('cassandra') def query_tokens(account): try: chat_account = ChatAccount.objects(ChatAccount.account == account)[0] except (IndexError, InvalidRequest): deferred.callback(None) else: deferred.callback(chat_account) query_tokens(account) return deferred def get_account_token(self, account): deferred = defer.Deferred() @run_in_thread('cassandra') def query_tokens(account): try: chat_account = ChatAccount.objects(ChatAccount.account == account)[0] except (IndexError, InvalidRequest): deferred.callback(None) else: deferred.callback(chat_account.api_token) query_tokens(account) return deferred @run_in_thread('cassandra') def add_account(self, account): timestamp = datetime.datetime.now() try: ChatAccount.create(account=account, last_login=timestamp) except (CQLEngineException, InvalidRequest) as e: log.error(f'Storing account failed: {e}') raise StorageError @run_in_thread('cassandra') def add_account_token(self, account, token): try: chat_account = ChatAccount.objects(account=account)[0] chat_account.ttl(2678400).update(api_token=token) except IndexError: log.error(f'Updating API token for {account} failed') raise StorageError @run_in_thread('cassandra') def mark_conversation_read(self, account, contact): for message in ChatMessage.objects(ChatMessage.account == account): if message.contact == contact: if message.content_type == 'application/sylk-conversation-read': message.delete() else: message.disposition.clear() message.save() def get_public_key(self, account): deferred = defer.Deferred() @run_in_thread('cassandra') def query_key(account): try: public_key = PublicKey.objects(PublicKey.account == account)[0] except (IndexError, InvalidRequest): deferred.callback(None) else: deferred.callback(public_key.public_key) query_key(account) return deferred @run_in_thread('cassandra') def update(self, account, state, message_id): try: timestamp = ChatMessageIdMapping.objects(ChatMessageIdMapping.message_id == message_id)[0] except IndexError: return else: try: message = ChatMessage.objects(ChatMessage.account == account, ChatMessage.created_at == timestamp.created_at, ChatMessage.message_id == message_id)[0] except IndexError: pass else: if message.state != 'received': message.state = state if state == 'delivered': try: message.disposition.remove('positive-delivery') except ValueError: pass elif state == 'displayed': message.disposition.clear() message.save() @run_in_thread('cassandra') def add(self, account, contact, direction, content, content_type, timestamp, disposition_notification, message_id, state=None): try: msg_timestamp = datetime.datetime.fromisoformat(timestamp) except ValueError: msg_timestamp = datetime.datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%f%z") timestamp = datetime.datetime.now() timestamp_not_found = True try: created_at = ChatMessageIdMapping.objects(ChatMessageIdMapping.message_id == message_id)[0] except IndexError: pass else: timestamp_not_found = False timestamp = created_at.created_at if content_type == 'message/imdn+json': return if ChatMessage.objects(ChatMessage.account == account, ChatMessage.created_at == created_at.created_at, ChatMessage.message_id == message_id).count() != 0: return if content_type == 'text/pgp-public-key': try: PublicKey.create(account=account if direction == "outgoing" else contact, public_key=content, created_at=timestamp) ChatMessageIdMapping.create(created_at=timestamp, message_id=message_id) except (CQLEngineException, InvalidRequest) as e: log.error(f'Storing public key failed: {e}') raise StorageError else: return if content_type == 'text/pgp-private-key': if timestamp_not_found: try: ChatMessageIdMapping.create(created_at=timestamp, message_id=message_id) except (CQLEngineException, InvalidRequest): pass return try: ChatMessage.create(account=account, direction=direction, contact=contact, content_type=content_type, content=content, created_at=timestamp, message_id=message_id, disposition=disposition_notification, state=state, msg_timestamp=msg_timestamp) if timestamp_not_found: ChatMessageIdMapping.create(created_at=timestamp, message_id=message_id) except (CQLEngineException, InvalidRequest) as e: log.error(f'Storing message failed: {e}') raise StorageError @run_in_thread('cassandra') def removeChat(self, account, contact): try: messages = ChatMessage.objects(ChatMessage.account == account) except LWTException: pass else: for message in messages: if message.contact == contact: message.delete() @run_in_thread('cassandra') def removeMessage(self, account, message_id): try: timestamp = ChatMessageIdMapping.objects(ChatMessageIdMapping.message_id == message_id)[0] except IndexError: return else: try: ChatMessage.objects(ChatMessage.account == account, ChatMessage.created_at == timestamp.created_at, ChatMessage.message_id == message_id).if_exists().delete() except LWTException: pass class TokenStorage(object, metaclass=Singleton): def __new__(self): if CASSANDRA_MODULES_AVAILABLE and CassandraConfig.cluster_contact_points: return CassandraTokenStorage() else: return FileTokenStorage() class MessageStorage(object, metaclass=Singleton): def __new__(self): if CASSANDRA_MODULES_AVAILABLE and CassandraConfig.cluster_contact_points: return CassandraMessageStorage() else: return FileMessageStorage()