diff --git a/sylk-db b/sylk-db index 9ce1527..61d3508 100755 --- a/sylk-db +++ b/sylk-db @@ -1,750 +1,750 @@ #!/usr/bin/env python3 import argparse import sys import os from application import log from application.process import process CASSANDRA_MODULES_AVAILABLE = False try: from cassandra.cqlengine import columns, connection except ImportError: pass else: try: from cassandra.cqlengine.models import Model except ImportError: pass else: CASSANDRA_MODULES_AVAILABLE = True from cassandra import InvalidRequest from cassandra.cqlengine.query import LWTException from cassandra.cluster import Cluster, Session, NoHostAvailable, ExecutionProfile, EXEC_PROFILE_DEFAULT from cassandra.policies import DCAwareRoundRobinPolicy from cassandra.cqlengine.management import sync_table, create_keyspace_simple class colors: if sys.stdout.isatty(): TGREEN = '\033[32m' ENDC = '\033[m' BOLD = '\033[1m' else: TGREEN = '' ENDC = '' BOLD = '' class parse_level: def format(record): if record.levelname != 'INFO': return f'{record.levelname:<8s} ' else: return f"{' ......':<8s} " class parse_level_indent: def format(record): if record.levelname != 'INFO': return f'{record.levelname:<16s} ' else: return f"{' ......':<16s} " def ask(question): try: while (res := input(colors.TGREEN + f"{'>>':>8s} {question} (Enter y/n) " + colors.ENDC).lower()) not in {"y", "n"}:pass except KeyboardInterrupt: sys.exit(1) if res == "y": return True return False def ask_num(question, num_range): while True: try: res = input(colors.TGREEN + f"{'>>':>8s} {question} (Enter a number [0-{num_range}] or all) " + colors.ENDC) except (EOFError, KeyboardInterrupt): print('\n') sys.exit(1) try: res = int(res) except ValueError: if res == 'all': break log.info('Enter a valid number') continue else: if res not in range(0, num_range): log.info('Enter a valid number') continue break return res def bold(string): return colors.BOLD + string + colors.ENDC def green(string): return colors.TGREEN + string + colors.ENDC def bold_green(string): return colors.BOLD + colors.TGREEN + string + colors.ENDC def db_init(): log.info(f"\n{' SylkServer - Cassandra database create/maintenance ':*^80s}\n") log.warn('Please note, this script can potentially destroy the data in the configured Cassandra keyspace.') log.warn('Make sure you have a backup if you already have data in the Cassandra cluster') if not ask("Would you like to continue?"): sys.exit() os.environ['CQLENG_ALLOW_SCHEMA_MANAGEMENT'] = '1' from sylk.applications.webrtcgateway.configuration import CassandraConfig from sylk.applications.webrtcgateway.models.storage.cassandra import PushTokens, ChatAccount, ChatMessage, ChatMessageIdMapping, PublicKey log.Formatter.prefix_format = parse_level log.Formatter.prefix_length = 0 configuration = CassandraConfig.__cfgtype__(CassandraConfig.__cfgfile__) if configuration.files: log.info('Reading storage configuration from {}'.format(', '.join(configuration.files))) if CassandraConfig.push_tokens_table: PushTokens.__table_name__ = CassandraConfig.push_tokens_table if CASSANDRA_MODULES_AVAILABLE: if CassandraConfig.cluster_contact_points: profile = ExecutionProfile( load_balancing_policy=DCAwareRoundRobinPolicy(), request_timeout=60 ) cluster = Cluster(CassandraConfig.cluster_contact_points, protocol_version=4, execution_profiles={EXEC_PROFILE_DEFAULT: profile}) try: session = cluster.connect() except NoHostAvailable as e: log.warning("Can't connect to Cassandra cluster") sys.exit() else: connection.set_session(session) if CassandraConfig.keyspace in cluster.metadata.keyspaces: log.info(f"Keyspace {bold(CassandraConfig.keyspace)} is already on the server.") else: log.warning(f"Keyspace {bold(CassandraConfig.keyspace)} is {bold('not')} defined on the server") if ask("Would you like to create the keyspace with SimpleStrategy?"): create_keyspace_simple(CassandraConfig.keyspace, 1) else: sys.exit(1) keyspace = cluster.metadata.keyspaces[CassandraConfig.keyspace] log.info(f'Server has keyspace {bold(keyspace.name)} with replication strategy: {keyspace.replication_strategy.name}') tables = [PushTokens, ChatAccount, ChatMessage, ChatMessageIdMapping, PublicKey] for table in tables: table.__keyspace__ = CassandraConfig.keyspace if table.__table_name__ in cluster.metadata.keyspaces[CassandraConfig.keyspace].tables: log.info(f'Table {bold(table.__table_name__)} is in the keyspace {bold(CassandraConfig.keyspace)}') if ask("Would you like to update the schema with the model?"): sync_table(table) else: log.info(f'Table {bold(table.__table_name__)} is not the keyspace {bold(CassandraConfig.keyspace)}') if ask("Would you like to create the table from the model?"): sync_table(table) else: log.warning("Cassandra cluster contact points are not set, please adjust webrtcgateway.ini'") sys.exit() else: log.warning('The python Cassandra drivers are not installed, please make sure they are installed') sys.exit() def remove_account(account): log.info(f"\n{' SylkServer - Remove account ':*^80s}\n") log.warn(f'Please note, will destroy {bold("ALL")} data of the account {account}') if not ask("Would you like to continue?"): sys.exit() - from sylk.applications.webrtcgateway.configuration import CassandraConfig + from sylk.applications.webrtcgateway.configuration import CassandraConfig, FileStorageConfig from sylk.applications.webrtcgateway.models.storage.cassandra import PushTokens, ChatAccount, ChatMessage, ChatMessageIdMapping, PublicKey log.Formatter.prefix_format = parse_level log.Formatter.prefix_length = 0 configuration = CassandraConfig.__cfgtype__(CassandraConfig.__cfgfile__) if configuration.files: log.info('Reading storage configuration from {}'.format(', '.join(configuration.files))) if CassandraConfig.push_tokens_table: PushTokens.__table_name__ = CassandraConfig.push_tokens_table if CASSANDRA_MODULES_AVAILABLE and CassandraConfig.cluster_contact_points: profile = ExecutionProfile( load_balancing_policy=DCAwareRoundRobinPolicy(), request_timeout=60 ) cluster = Cluster(CassandraConfig.cluster_contact_points, protocol_version=4, execution_profiles={EXEC_PROFILE_DEFAULT: profile}) try: session = cluster.connect() except NoHostAvailable as e: log.warning("Can't connect to Cassandra cluster") sys.exit() else: connection.set_session(session) keyspace = cluster.metadata.keyspaces[CassandraConfig.keyspace] log.info(f'Server has keyspace {bold(keyspace.name)} with replication strategy: {keyspace.replication_strategy.name}') tables = [PushTokens, ChatAccount, ChatMessage, ChatMessageIdMapping, PublicKey] for table in tables: table.__keyspace__ = CassandraConfig.keyspace try: messages = ChatMessage.objects(ChatMessage.account == account) except LWTException: pass else: size = len(messages) for message in messages: message.delete() log.info(f'Removed {size} messages from {account}') try: accounts = ChatAccount.objects(ChatAccount.account == account) except LWTException: pass else: size = len(accounts) for acc in accounts: acc.delete() log.info(f'Removed {account} from accounts') username, domain = account.split('@') try: push_tokens = PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain) except LWTException: pass else: size = len(push_tokens) for token in push_tokens: token.delete() log.info(f'Removed {size} push tokens from {account}') try: public_keys = PublicKey.objects(PublicKey.account == account) except LWTException: pass else: size = len(public_keys) for key in public_keys: key.delete() pass log.info(f'Removed public Key from {account}') else: log.warning('The python Cassandra drivers are not installed.') log.warning('We will use the JSON and pickle backend to wipe the account data') if not ask("Would you like to continue?"): sys.exit() from sylk.applications.webrtcgateway.storage import TokenStorage, MessageStorage from sipsimple.threading import ThreadManager import time log.Formatter.prefix_format = parse_level log.Formatter.prefix_length = 0 storage = TokenStorage() storage.load() time.sleep(.5) storage.removeAll(account) storage = MessageStorage() storage.load() time.sleep(.5) storage.remove_account(account) storage.remove_public_key(account) - storage_path = os.path.join(ServerConfig.spool_dir, 'conversations') + storage_path = os.path.join(FileStorageConfig.storage_dir, 'conversations') if os.path.exists(os.path.join(storage_path, account[0], f'{account}_messages.json')): os.remove(os.path.join(storage_path, account[0], f'{account}_messages.json')) ThreadManager().stop() def remove_messages(account, contact): log.info(f"\n{' SylkServer - Remove messages ':*^80s}\n") if contact is not None: log.warn(f'Please note, will destroy the messages from {contact} for the account {account} ') else: log.warn(f'Please note, will destroy {bold("ALL")} messages of the account {account}') if not ask("Would you like to continue?"): sys.exit() - from sylk.applications.webrtcgateway.configuration import CassandraConfig + from sylk.applications.webrtcgateway.configuration import CassandraConfig, FileStorageConfig from sylk.applications.webrtcgateway.models.storage.cassandra import ChatMessage from sylk.applications.webrtcgateway.storage import MessageStorage from sipsimple.threading import ThreadManager log.Formatter.prefix_format = parse_level log.Formatter.prefix_length = 0 if contact is not None: storage = MessageStorage() storage.load() storage.removeChat(account, contact) ThreadManager().stop() log.info(f'Messages from {contact} for {account} removed') sys.exit() configuration = CassandraConfig.__cfgtype__(CassandraConfig.__cfgfile__) if configuration.files: log.info('Reading storage configuration from {}'.format(', '.join(configuration.files))) if CASSANDRA_MODULES_AVAILABLE and CassandraConfig.cluster_contact_points: profile = ExecutionProfile( load_balancing_policy=DCAwareRoundRobinPolicy(), request_timeout=60 ) cluster = Cluster(CassandraConfig.cluster_contact_points, protocol_version=4, execution_profiles={EXEC_PROFILE_DEFAULT: profile}) try: session = cluster.connect() except NoHostAvailable as e: log.warning("Can't connect to Cassandra cluster") sys.exit() else: connection.set_session(session) keyspace = cluster.metadata.keyspaces[CassandraConfig.keyspace] log.info(f'Server has keyspace {bold(keyspace.name)} with replication strategy: {keyspace.replication_strategy.name}') tables = [ChatMessage] for table in tables: table.__keyspace__ = CassandraConfig.keyspace try: messages = ChatMessage.objects(ChatMessage.account == account) except LWTException: pass else: size = len(messages) for message in messages: message.delete() log.info(f'Removed {size} messages from {account}') else: - storage_path = os.path.join(ServerConfig.spool_dir, 'conversations') + storage_path = os.path.join(FileStorageConfig.storage_dir, 'conversations') if os.path.exists(os.path.join(storage_path, account[0], f'{account}_messages.json')): os.remove(os.path.join(storage_path, account[0], f'{account}_messages.json')) log.info(f'Messages for {account} removed') def remove_data(account, data_type): data_types = ['push_token', 'api_token', 'public_key'] if data_type not in data_types: return log.info(f"\n{' SylkServer - Remove '+ data_type + ' ':*^80s}\n") log.warn(f'Please note, this will remove the {data_type} the account {account}') if not ask("Would you like to continue?"): sys.exit() from sylk.applications.webrtcgateway.configuration import CassandraConfig from sylk.applications.webrtcgateway.models.storage.cassandra import PushTokens, ChatAccount, ChatMessage, ChatMessageIdMapping, PublicKey log.Formatter.prefix_format = parse_level log.Formatter.prefix_length = 0 configuration = CassandraConfig.__cfgtype__(CassandraConfig.__cfgfile__) if configuration.files: log.info('Reading storage configuration from {}'.format(', '.join(configuration.files))) if CassandraConfig.push_tokens_table: PushTokens.__table_name__ = CassandraConfig.push_tokens_table if CASSANDRA_MODULES_AVAILABLE and CassandraConfig.cluster_contact_points: profile = ExecutionProfile( load_balancing_policy=DCAwareRoundRobinPolicy(), request_timeout=60 ) cluster = Cluster(CassandraConfig.cluster_contact_points, protocol_version=4, execution_profiles={EXEC_PROFILE_DEFAULT: profile}) try: session = cluster.connect() except NoHostAvailable as e: log.warning("Can't connect to Cassandra cluster") sys.exit() else: connection.set_session(session) keyspace = cluster.metadata.keyspaces[CassandraConfig.keyspace] log.info(f'Server has keyspace {bold(keyspace.name)} with replication strategy: {keyspace.replication_strategy.name}') tables = [PushTokens, ChatAccount, PublicKey] for table in tables: table.__keyspace__ = CassandraConfig.keyspace if data_type == 'api_token': try: accounts = ChatAccount.objects(ChatAccount.account == account) except LWTException: pass else: size = len(accounts) for acc in accounts: acc.api_token = '' acc.save(); log.info(f'Api token for {account} removed') return if data_type == 'push_token': username, domain = account.split('@') try: push_tokens = PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain) except LWTException: pass else: size = len(push_tokens) log.info(f'{bold(str(size))} push token(s) stored\n') if size >= 2: for idx, token in enumerate(push_tokens): log.info(f'[{idx}]: {"App:":>13s} {token.app_id}\n{"Device ID:":>18s} {token.device_id}\n{"Token:":>18s} {token.device_token[:20]} ...\n') num = ask_num("Pick a token to delete", size-1) if num == 'all': for token in push_tokens: log.info(f'Removed token {token.device_token[:20]} from {account}') token.delete() else: log.info(f'Removed {push_tokens[num].device_token[:20]} from {account}') push_tokens[num].delete() else: for token in push_tokens: log.info(f'Removed token {token.device_token[:20]} from {account}') token.delete() return if data_type == 'public_key': try: public_keys = PublicKey.objects(PublicKey.account == account) except LWTException: pass else: size = len(public_keys) for key in public_keys: key.delete() pass log.info(f'Removed public Key from {account}') return else: log.warning('The python Cassandra drivers are not installed.') log.warning('We will use the JSON and pickle backend to wipe the account data') if not ask("Would you like to continue?"): sys.exit() from sylk.applications.webrtcgateway.storage import TokenStorage, MessageStorage from sipsimple.threading import ThreadManager import time log.Formatter.prefix_format = parse_level log.Formatter.prefix_length = 0 if data_type == 'api_token': storage = MessageStorage() storage.load() time.sleep(.5) storage.remove_account_token(account) log.info(f'Api token for {account} removed') return if data_type == 'push_token': storage = TokenStorage() storage.load() time.sleep(.5) push_tokens = storage[account] if push_tokens: size = len(push_tokens) log.info(f'{bold(str(size))} push token(s) stored\n') if size >= 2: for idx, token in enumerate(push_tokens): token = push_tokens[token] log.info(f"[{idx}]: {'App:':>13s} {token['app_id']}\n{'Device ID:':>18s} {token['device_id']}\n{'Token:':>18s} {token['device_token'][:20]} ...\n") num = ask_num("Pick a token to delete", size - 1) if num == 'all': for token in push_tokens: token = push_tokens[token] log.info(f"Removed token {token['device_token'][:20]} from {account}") storage.remove(account, token['app_id'], token['device_id']) else: for idx, token in enumerate(push_tokens): token = push_tokens[token] if idx == num: log.info(f"Removed {token['device_token'][:20]} from {account}") storage.remove(account, token['app_id'], token['device_id']) else: for token in push_tokens: token = push_tokens[token] log.info(f"Removed token {token['device_token'][:20]} from {account}") storage.remove(account, token['app_id'], token['device_id']) return if data_type == 'public_key': storage = TokenStorage() storage.load() time.sleep(.5) storage.remove_public_key(account) log.info(f'Removed public Key from {account}') return def show(account): log.info(f"\n{' SylkServer - Show account data ':*^80s}\n") from sylk.applications.webrtcgateway.configuration import CassandraConfig from sylk.applications.webrtcgateway.models.storage.cassandra import PushTokens, ChatAccount, ChatMessage, ChatMessageIdMapping, PublicKey log.Formatter.prefix_format = parse_level log.Formatter.prefix_length = 0 configuration = CassandraConfig.__cfgtype__(CassandraConfig.__cfgfile__) if configuration.files: log.info('Reading storage configuration from {}'.format(', '.join(configuration.files))) if CassandraConfig.push_tokens_table: PushTokens.__table_name__ = CassandraConfig.push_tokens_table if CASSANDRA_MODULES_AVAILABLE and CassandraConfig.cluster_contact_points: profile = ExecutionProfile( load_balancing_policy=DCAwareRoundRobinPolicy(), request_timeout=60 ) cluster = Cluster(CassandraConfig.cluster_contact_points, protocol_version=4, execution_profiles={EXEC_PROFILE_DEFAULT: profile}) try: session = cluster.connect() except NoHostAvailable as e: log.warning("Can't connect to Cassandra cluster") sys.exit() else: connection.set_session(session) keyspace = cluster.metadata.keyspaces[CassandraConfig.keyspace] log.info(f'Server has keyspace {bold(keyspace.name)} with replication strategy: {keyspace.replication_strategy.name}') tables = [PushTokens, ChatAccount, ChatMessage, ChatMessageIdMapping, PublicKey] for table in tables: table.__keyspace__ = CassandraConfig.keyspace log.info(f"\n{'-'*80}\n{ account :^80s}\n{'-'*80}") try: accounts = ChatAccount.objects(ChatAccount.account == account) except LWTException: pass else: if not len(accounts): log.info(f'Message storage is {bold("not")} enabled') else: log.info(f'{green("Message storage is")} {bold_green("enabled")}') for acc in accounts: if acc.api_token: row = session.execute(f'SELECT TTL(api_token) as ttl FROM {ChatAccount.__keyspace__}.{ChatAccount.__table_name__} where account=\'{acc.account}\'') log.info(f"{'API token for replication:':26s} {bold(str(acc.api_token))}\n{'TTL:':>26s} {row.one()['ttl']}\n") log.info(f'{"Last login at: "} {acc.last_login}\n') try: messages = ChatMessage.objects(ChatMessage.account == account) except LWTException: pass else: size = len(messages) log.info(f'{bold(str(size))} messages stored') text_messages = [message for message in messages if message.content_type.startswith('text')] imdn_messages = [message for message in messages if message.content_type.startswith('message/imdn')] other_messages = [message for message in messages if not message.content_type.startswith('message/imdn') and not message.content_type.startswith('text')] log.info(f'\n{"Text Messages:":>23s} {len(text_messages)}\n{"IMDN messages:":>23s} {len(imdn_messages)}\n{"Other Messages:":>23s} {len(other_messages)}\n') unread_messages = [message for message in messages if ((message.content_type == 'text/plain' or message.content_type == 'text/html') and message.direction == 'incoming' and message.contact != account and 'display' in message.disposition)] log.info(f'{"Unread text Messages:":>23s} {len(unread_messages)}\n') username, domain = account.split('@') try: push_tokens = PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain) except LWTException: pass else: size = len(push_tokens) log.info(f'{bold(str(size))} push token(s) stored\n') for token in push_tokens: log.info(f'{"App:":>18s} {token.app_id}\n{"Device ID:":>18s} {token.device_id}\n{"Token:":>18s} {token.device_token[:20]} ...\n') # from sylk.applications.webrtcgateway.storage import FileTokenStorage # storage = FileTokenStorage() # test = {} # test['pn_device'] = token.device_id # test['pn_silent'] = token.silent # test['pn_tok'] = f'{token.device_token}-{token.background_token}' # test['pn_app'] = token.app_id # test['pn_type'] = token.platform # storage.load() # import time # time.sleep(.5) # print(test) # storage.add(account, test, token.user_agent) try: public_keys = PublicKey.objects(PublicKey.account == account) except LWTException: pass else: size = len(public_keys) log.info(f'{bold(str(size))} public key(s) stored\n') log.Formatter.prefix_format = parse_level_indent log.Formatter.prefix_length = 0 for key in public_keys: log.info(f'{key.public_key}') log.Formatter.prefix_format = parse_level log.Formatter.prefix_length = 0 else: log.warning('The python Cassandra drivers are not installed.') log.warning('We will use the JSON and pickle backend for the account data') import time from twisted.internet import defer from sylk.applications.webrtcgateway.storage import TokenStorage, MessageStorage from sipsimple.threading import ThreadManager log.info(f"\n{'-'*80}\n{ account :^80s}\n{'-'*80}") def print_messages(messages): size = len(messages) log.info(f'{bold(str(size))} messages stored') text_messages = [message for message in messages if message['content_type'].startswith('text')] imdn_messages = [message for message in messages if message['content_type'].startswith('message/imdn')] other_messages = [message for message in messages if not message['content_type'].startswith('message/imdn') and not message['content_type'].startswith('text')] log.info(f'\n{"Text Messages:":>23s} {len(text_messages)}\n{"IMDN messages:":>23s} {len(imdn_messages)}\n{"Other Messages:":>23s} {len(other_messages)}\n') unread_messages = [message for message in messages if ((message['content_type'] == 'text/plain' or message['content_type'] == 'text/html') and message['direction'] == 'incoming' and message['contact'] != account and 'display' in message['disposition'])] log.info(f'{"Unread text Messages:":>23s} {len(unread_messages)}\n') storage = MessageStorage() storage.load() time.sleep(.5) acc = storage.get_account(account) public_key = storage.get_public_key(account) if not acc: log.info(f'Message storage is {bold("not")} enabled') else: log.info(f'{green("Message storage is")} {bold_green("enabled")}') token = storage.get_account_token(account) if token: ttl = storage._accounts[account]['token_expire'] log.info(f"{'API token for replication:':26s} {bold(str(token))}\n{'TTL:':>26s} {ttl}") last_login = storage._accounts[account]['last_login'] log.info(f'{"Last login at: "} {last_login}\n') msg = storage[[account, None]] if isinstance(msg, defer.Deferred): msg.addCallback(lambda result: print_messages(result)) storage = TokenStorage() storage.load() time.sleep(.5) push_tokens = storage[account] if push_tokens: size = len(push_tokens) log.info(f'{bold(str(size))} push token(s) stored\n') for token in push_tokens: token = push_tokens[token] log.info(f"{'App:':>18s} {token['app_id']}\n{'Device ID:':>18s} {token['device_id']}\n{'Token:':>18s} {token['device_token'][:20]} ...\n") size = 0 if public_key: size = 1 log.info(f'{bold(str(size))} public key(s) stored\n') log.Formatter.prefix_format = parse_level_indent log.Formatter.prefix_length = 0 if public_key: log.info(f'{public_key}') log.Formatter.prefix_format = parse_level log.Formatter.prefix_length = 0 time.sleep(1) ThreadManager().stop() if __name__ == '__main__': process.configuration.subdirectory = 'sylkserver' parser = argparse.ArgumentParser(add_help=False) parser.add_argument('-h', '--help', action='help', default=argparse.SUPPRESS, help='Show this help message and exit.') parser.add_argument("--config-dir", dest='config_directory', default=None, metavar='PATH', help="Set a config directory.") subparsers = parser.add_subparsers(dest='action') dbinit = subparsers.add_parser('dbinit', help='initialize/Update database (default)') remove = subparsers.add_parser('remove', help='remove data from an account') subparsers_remove = remove.add_subparsers(dest='remove_action') remove_all = subparsers_remove.add_parser('all', help='remove all data from an account') remove_all.add_argument('account', help='Account') remove_messages_sub = subparsers_remove.add_parser('messages', help='remove all messages from an account') remove_messages_sub.add_argument('account', help='Account') remove_messages_sub.add_argument('contact', nargs='?', help='optional contact to remove messages from') remove_api_token = subparsers_remove.add_parser('api_token', help='remove api token from an account') remove_api_token.add_argument('account', help='Account') remove_push_token = subparsers_remove.add_parser('push_token', help='remove push token(s) from an account') remove_push_token.add_argument('account', help='Account') remove_public_key = subparsers_remove.add_parser('public_key', help='remove public key from an account') remove_public_key.add_argument('account', help='Account') show_data = subparsers.add_parser('show', help='show all data from an account') show_data.add_argument('account', help='Account') options = parser.parse_args() if options.config_directory is not None: process.configuration.local_directory = options.config_directory log.Formatter.prefix_format = parse_level log.Formatter.prefix_length = 0 from sylk.server import ServerConfig log.Formatter.prefix_format = parse_level log.Formatter.prefix_length = 0 if not options.action: parser.print_help() sys.exit() configuration = ServerConfig.__cfgtype__(ServerConfig.__cfgfile__) if configuration.files: log.info('Reading configuration from {}'.format(', '.join(configuration.files))) else: log.info('Not reading any configuration files (using internal defaults)') if options.action == 'remove': if options.remove_action == 'all': remove_account(options.account) elif options.remove_action == 'remove_messages': remove_messages(options.account, options.contact) elif options.remove_action in ['api_token', 'push_token', 'public_key']: remove_data(options.account, options.remove_action) elif options.action == 'show': show(options.account) elif options.action == 'dbinit': db_init() diff --git a/sylk/applications/webrtcgateway/__init__.py b/sylk/applications/webrtcgateway/__init__.py index f925c87..f62666d 100644 --- a/sylk/applications/webrtcgateway/__init__.py +++ b/sylk/applications/webrtcgateway/__init__.py @@ -1,266 +1,265 @@ import os import time import errno import re from application.notification import IObserver, NotificationCenter from application.python import Null from application.system import unlink from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.streams.msrp.chat import CPIMPayload, CPIMParserError from sipsimple.threading import run_in_thread from twisted.internet import defer, reactor from zope.interface import implementer from sylk.applications import SylkApplication from sylk.session import IllegalStateError from .configuration import GeneralConfig from .datatypes import FileTransferData from .sip_handlers import FileTransferHandler, MessageHandler from .logger import log from .storage import TokenStorage, MessageStorage from .web import WebHandler, AdminWebHandler @implementer(IObserver) class WebRTCGatewayApplication(SylkApplication): def __init__(self): self.web_handler = WebHandler() self.admin_web_handler = AdminWebHandler() def start(self): self.web_handler.start() self.admin_web_handler.start() # Load tokens from the storage token_storage = TokenStorage() token_storage.load() # Setup message storage message_storage = MessageStorage() message_storage.load() self.clean_filetransfers() def stop(self): self.web_handler.stop() self.admin_web_handler.stop() @run_in_thread('file-io') def clean_filetransfers(self): - settings = SIPSimpleSettings() - top = settings.file_transfer.directory.normalized + top = GeneralConfig.file_transfer_dir.normalized removed_dirs = removed_files = 0 for root, dirs, files in os.walk(top, topdown=False): for name in files: file = os.path.join(root, name) statinfo = os.stat(file) current_time = time.time() remove_after_days = GeneralConfig.filetransfer_expire_days if (statinfo.st_size >= 1024 * 1024 * 50 and statinfo.st_mtime < current_time - 86400 * remove_after_days): log.info(f"[housekeeper] Removing expired filetransfer file: {file}") removed_files += 1 unlink(file) elif statinfo.st_mtime < current_time - 86400 * 2 * remove_after_days: log.info(f"[housekeeper] Removing expired file transfer file: {file}") removed_files += 1 unlink(file) for name in dirs: dir = os.path.join(root, name) try: os.rmdir(dir) except OSError as ex: if ex.errno == errno.ENOTEMPTY: pass else: removed_dirs += 1 log.info(f"[housekeeper] Removing expired file transfer dir {dir}") log.info(f"[housekeeper] Removed {removed_files} files, {removed_dirs} directories") reactor.callLater(3600, self.clean_filetransfers) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def incoming_session(self, session): # TODO: handle diverted sessions? log.info('New incoming session {session.call_id} from sip:{uri.user}@{uri.host}'.format(session=session, uri=session.remote_identity.uri)) transfer_streams = [stream for stream in session.proposed_streams if stream.type == 'file-transfer'] if not transfer_streams: session.reject(488) return transfer_stream = transfer_streams[0] sender = f'{session.remote_identity.uri.user}@{session.remote_identity.uri.host}' receiver = f'{session.local_identity.uri.user}@{session.local_identity.uri.host}' file_selector = transfer_stream.file_selector if transfer_stream.direction == 'sendonly': transfer_data = FileTransferData(file_selector.name, file_selector.size, file_selector.type, transfer_stream.transfer_id, receiver, sender) session.transfer_data = transfer_data message_storage = MessageStorage() account = defer.maybeDeferred(message_storage.get_account, receiver) account.addCallback(lambda result: self._check_receiver_session(result)) sender_account = defer.maybeDeferred(message_storage.get_account, sender) sender_account.addCallback(lambda result: self._check_sender_session(result, session)) d1 = defer.DeferredList([account, sender_account], consumeErrors=True) d1.addCallback(lambda result: self._handle_pull_transfer(result, session, transfer_stream)) return transfer_data = FileTransferData(file_selector.name, file_selector.size, file_selector.type, transfer_stream.transfer_id, sender, receiver) session.transfer_data = transfer_data message_storage = MessageStorage() account = defer.maybeDeferred(message_storage.get_account, receiver) account.addCallback(lambda result: self._check_receiver_session(result)) sender_account = defer.maybeDeferred(message_storage.get_account, sender) sender_account.addCallback(lambda result: self._check_sender_session(result, session)) d1 = defer.DeferredList([account, sender_account], consumeErrors=True) d1.addCallback(lambda result: self._handle_lookup_result(result, session, transfer_stream)) NotificationCenter().add_observer(self, sender=session) def _check_receiver_session(self, account): if account is None: raise Exception("Receiver account for filetransfer not found") def _check_sender_session(self, account, session): if account is None: session.transfer_data.update_path_for_receiver() raise Exception("Sender account for filetransfer not found") def _handle_lookup_result(self, result, session, stream): reject_session = all([success is not True for (success, value) in result]) if reject_session: self._reject_session(session, "Sender and receiver accounts for filetransfer were not found") return stream.handler.save_directory = session.transfer_data.path log.info('File transfer from {sender.uri} to {receiver.uri} will be saved to {path}/{filename}'.format(**session.transfer_data.__dict__)) self._accept_session(session, [stream]) def _handle_pull_transfer(self, result, session, stream): reject_session = all([success is not True for (success, value) in result]) if reject_session: self._reject_session(session, "Sender and receiver accounts for filetransfer were not found") return transfer_data = session.transfer_data original_filename = transfer_data.filename if not os.path.exists(transfer_data.full_path): filename, ext = os.path.splitext(transfer_data.filename) filename = re.sub(r"-[0-9]$", '', filename) transfer_data.filename = f'{filename}{ext}' if not os.path.exists(transfer_data.full_path): transfer_data.filename = original_filename transfer_data.update_path_for_receiver() if not os.path.exists(transfer_data.full_path): filename, ext = os.path.splitext(transfer_data.filename) filename = re.sub(r"-[0-9]$", '', filename) transfer_data.filename = f'{filename}{ext}' if os.path.exists(transfer_data.full_path): meta_filepath = os.path.join(transfer_data.path, f'meta-{transfer_data.filename}') import json from sipsimple.streams.msrp.filetransfer import FileSelector try: with open(meta_filepath, 'r') as meta_file: metadata = json.load(meta_file) except (OSError, IOError,ValueError): log.warning('Could load metadata %s' % meta_filepath) session.reject(404) return if stream.file_selector.hash == metadata['hash']: file_selector_disk = FileSelector.for_file(os.path.join(transfer_data.path, transfer_data.filename)) stream.file_selector = file_selector_disk NotificationCenter().add_observer(self, sender=session) self._accept_session(session, [stream]) return else: log.warning(f'File transfer rejected, {original_filename} file is not found') session.reject(500) def _reject_session(self, session, error): log.warning(f'File transfer rejected: {error}') session.reject(404) def _accept_session(self, session, streams): try: session.accept(streams) except IllegalStateError: session.reject(500) def incoming_subscription(self, request, data): request.reject(405) def incoming_referral(self, request, data): request.reject(405) def incoming_message(self, message_request, data): content_type = data.headers.get('Content-Type', Null).content_type from_header = data.headers.get('From', Null) to_header = data.headers.get('To', Null) if Null in (content_type, from_header, to_header): message_request.answer(400) return if not data.body: log.warning('SIP message from %s to %s rejected: empty body' % (from_header.uri, '%s@%s' % (to_header.uri.user, to_header.uri.host))) message_request.answer(400) return if content_type == 'message/cpim': try: cpim_message = CPIMPayload.decode(data.body) except (CPIMParserError, UnicodeDecodeError): # TODO: fix decoding in sipsimple log.warning('SIP message from %s to %s rejected: CPIM parse error' % (from_header.uri, '%s@%s' % (to_header.uri.user, to_header.uri.host))) message_request.answer(400) return else: content_type = cpim_message.content_type log.info('received SIP message (%s) from %s to %s' % (content_type, from_header.uri, '%s@%s' % (to_header.uri.user, to_header.uri.host))) message_request.answer(200) message_handler = MessageHandler() message_handler.incoming_message(data) def _NH_SIPSessionDidStart(self, notification): session = notification.sender try: transfer_stream = next(stream for stream in session.streams if stream.type == 'file-transfer') except StopIteration: pass else: transfer_handler = FileTransferHandler() transfer_handler.init_incoming(transfer_stream) def _NH_SIPSessionDidEnd(self, notification): session = notification.sender notification.center.remove_observer(self, sender=session) def _NH_SIPSessionDidFail(self, notification): session = notification.sender notification.center.remove_observer(self, sender=session) log.info('File transfer from %s to %s failed: %s (%s)' % (session.remote_identity.uri, session.local_identity.uri, notification.data.reason, notification.data.failure_reason)) diff --git a/sylk/applications/webrtcgateway/configuration.py b/sylk/applications/webrtcgateway/configuration.py index baab08f..6c77aea 100644 --- a/sylk/applications/webrtcgateway/configuration.py +++ b/sylk/applications/webrtcgateway/configuration.py @@ -1,232 +1,249 @@ import os import re from application.configuration import ConfigFile, ConfigSection, ConfigSetting from application.configuration.datatypes import NetworkAddress, StringList, HostnameList from sylk.configuration import ServerConfig from sylk.configuration.datatypes import Path, SIPProxyAddress, VideoBitrate, VideoCodec +from sylk.resources import VarResources __all__ = 'GeneralConfig', 'JanusConfig', 'get_room_config', 'ExternalAuthConfig', 'get_auth_config', 'CassandraConfig' # Datatypes class AccessPolicyValue(str): allowed_values = ('allow,deny', 'deny,allow') def __new__(cls, value): value = re.sub('\s', '', value) if value not in cls.allowed_values: raise ValueError('invalid value, allowed values are: %s' % ' | '.join(cls.allowed_values)) return str.__new__(cls, value) class Domain(str): domain_re = re.compile(r"^[a-zA-Z0-9\-_]+(\.[a-zA-Z0-9\-_]+)*$") def __new__(cls, value): value = str(value) if not cls.domain_re.match(value): raise ValueError("illegal domain: %s" % value) return str.__new__(cls, value) class SIPAddress(str): def __new__(cls, address): address = str(address) address = address.replace('@', '%40', address.count('@')-1) try: username, domain = address.split('@') Domain(domain) except ValueError: raise ValueError("illegal SIP address: %s, must be in user@domain format" % address) return str.__new__(cls, address) class PolicyItem(object): def __new__(cls, item): lowercase_item = item.lower() if lowercase_item in ('none', ''): return 'none' elif lowercase_item in ('any', 'all', '*'): return 'all' elif '@' in item: return SIPAddress(item) else: return Domain(item) class PolicySettingValue(object): def __init__(self, value): if isinstance(value, (tuple, list)): items = [str(x) for x in value] elif isinstance(value, str): items = re.split(r'\s*,\s*', value) else: raise TypeError("value must be a string, list or tuple") self.items = {PolicyItem(item) for item in items} self.items.discard('none') def __repr__(self): return '{0.__class__.__name__}({1})'.format(self, sorted(self.items)) def match(self, uri): if 'all' in self.items: return True elif not self.items: return False uri = re.sub('^(sip:|sips:)', '', str(uri)) domain = uri.split('@')[-1] return uri in self.items or domain in self.items class ManagementInterfaceAddress(NetworkAddress): default_port = 20888 class AuthType(str): allowed_values = ('SIP', 'IMAP') def __new__(cls, value): value = re.sub('\s', '', value) if value not in cls.allowed_values: raise ValueError('invalid value, allowed values are: %s' % ' | '.join(cls.allowed_values)) return str.__new__(cls, value) class SIPAddressList(object): """A list of SIP uris separated by commas""" def __new__(cls, value): if isinstance(value, (tuple, list)): return [SIPAddress(x) for x in value] elif isinstance(value, str): if value.lower() in ('none', ''): return [] items = re.split(r'\s*,\s*', value) items = {SIPAddress(item) for item in items} return items else: raise TypeError('value must be a string, list or tuple') # Configuration objects +class ApplicationConfig(ConfigSection): + __cfgfile__ = 'webrtcgateway.ini' + __section__ = 'General' + + application_dir = ConfigSetting(type=Path, value=Path(VarResources.get('lib/sylkserver/webrtcgateway'))) + + class GeneralConfig(ConfigSection): __cfgfile__ = 'webrtcgateway.ini' __section__ = 'General' web_origins = ConfigSetting(type=StringList, value=['*']) sip_domains = ConfigSetting(type=StringList, value=['*']) outbound_sip_proxy = ConfigSetting(type=SIPProxyAddress, value=None) trace_client = False websocket_ping_interval = 120 + application_dir = ApplicationConfig.application_dir recording_dir = ConfigSetting(type=Path, value=Path(os.path.join(ServerConfig.spool_dir.normalized, 'videoconference', 'recordings'))) filesharing_dir = ConfigSetting(type=Path, value=Path(os.path.join(ServerConfig.spool_dir.normalized, 'videoconference', 'files'))) + file_transfer_dir = ConfigSetting(type=Path, value=Path(os.path.join(ApplicationConfig.application_dir.normalized, 'file_transfers'))) http_management_interface = ConfigSetting(type=ManagementInterfaceAddress, value=ManagementInterfaceAddress('127.0.0.1')) http_management_auth_secret = ConfigSetting(type=str, value=None) sylk_push_url = ConfigSetting(type=str, value=None) local_sip_messages = False filetransfer_expire_days = 15 class JanusConfig(ConfigSection): __cfgfile__ = 'webrtcgateway.ini' __section__ = 'Janus' api_url = 'ws://127.0.0.1:8188' api_secret = '0745f2f74f34451c89343afcdcae5809' trace_janus = False max_bitrate = ConfigSetting(type=VideoBitrate, value=VideoBitrate(2016000)) # ~2 MBits/s video_codec = ConfigSetting(type=VideoCodec, value=VideoCodec('vp9')) decline_code = 486 class CassandraConfig(ConfigSection): __cfgfile__ = 'webrtcgateway.ini' __section__ = 'Cassandra' cluster_contact_points = ConfigSetting(type=HostnameList, value=None) keyspace = ConfigSetting(type=str, value='') push_tokens_table = ConfigSetting(type=str, value='') +class FileStorageConfig(ConfigSection): + __cfgfile__ = 'webrtcgateway.ini' + __section__ = 'FileStorage' + + storage_dir = ConfigSetting(type=Path, value=Path(os.path.join(GeneralConfig.application_dir.normalized, 'storage'))) + + class RoomConfig(ConfigSection): __cfgfile__ = 'webrtcgateway.ini' record = False access_policy = ConfigSetting(type=AccessPolicyValue, value=AccessPolicyValue('allow, deny')) allow = ConfigSetting(type=PolicySettingValue, value=PolicySettingValue('all')) deny = ConfigSetting(type=PolicySettingValue, value=PolicySettingValue('none')) max_bitrate = ConfigSetting(type=VideoBitrate, value=JanusConfig.max_bitrate) video_codec = ConfigSetting(type=VideoCodec, value=JanusConfig.video_codec) video_disabled = False invite_participants = ConfigSetting(type=SIPAddressList, value=[]) persistent = False class VideoroomConfiguration(object): video_codec = 'vp9' max_bitrate = 2016000 record = False recording_dir = None filesharing_dir = None def __init__(self, data): self.__dict__.update(data) @property def janus_data(self): return dict(videocodec=self.video_codec, bitrate=self.max_bitrate, record=self.record, rec_dir=self.recording_dir) def get_room_config(room): config_file = ConfigFile(RoomConfig.__cfgfile__) section = config_file.get_section(room) if section is not None: RoomConfig.read(section=room) config = VideoroomConfiguration(dict(RoomConfig)) RoomConfig.reset() else: config = VideoroomConfiguration(dict(RoomConfig)) # use room defaults config.recording_dir = os.path.join(GeneralConfig.recording_dir, room) config.filesharing_dir = os.path.join(GeneralConfig.filesharing_dir, room) return config class ExternalAuthConfig(ConfigSection): __cfgfile__ = 'auth.ini' __section__ = 'ExternalAuth' enable = False # this can't be per-server due to limitations in imaplib imap_ca_cert_file = ConfigSetting(type=str, value='/etc/ssl/certs/ca-certificates.crt') class AuthConfig(ConfigSection): __cfgfile__ = 'auth.ini' auth_type = ConfigSetting(type=AuthType, value=AuthType('SIP')) imap_server = ConfigSetting(type=str, value='') class AuthConfiguration(object): auth_type = AuthType('SIP') def __init__(self, data): self.__dict__.update(data) def get_auth_config(domain): config_file = ConfigFile(AuthConfig.__cfgfile__) section = config_file.get_section(domain) if section is not None: AuthConfig.read(section=domain) config = AuthConfiguration(dict(AuthConfig)) AuthConfig.reset() else: config = AuthConfiguration(dict(AuthConfig)) # use auth defaults return config diff --git a/sylk/applications/webrtcgateway/datatypes.py b/sylk/applications/webrtcgateway/datatypes.py index b43d8ff..22dba52 100644 --- a/sylk/applications/webrtcgateway/datatypes.py +++ b/sylk/applications/webrtcgateway/datatypes.py @@ -1,133 +1,129 @@ import base64 import hashlib import json import os from datetime import datetime, timedelta import urllib.parse from sipsimple.util import ISOTimestamp -from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.payloads.rcsfthttp import FTHTTPDocument, FileInfo from sipsimple.streams.msrp.chat import CPIMPayload, ChatIdentity, CPIMHeader, CPIMNamespace from sylk.web import server from sylk.configuration.datatypes import URL from .configuration import GeneralConfig from .models import sylkrtc class FileTransferData(object): def __init__(self, filename, filesize, filetype, transfer_id, sender, receiver, content=None, url=None): self.content = content self.filename = filename self.filesize = filesize self.filetype = filetype self.transfer_id = transfer_id self.sender = sylkrtc.SIPIdentity(uri=sender, display_name='') self.receiver = sylkrtc.SIPIdentity(uri=receiver, display_name='') self.hashed_sender = sender self.hashed_receiver = receiver self.prefix = self.hashed_sender[:1] self.path = self._get_initial_path() self.timestamp = str(ISOTimestamp.now()) self.until = self._set_until() self.url = self._set_url() if not url else url def _encode_and_hash_uri(self, uri): return base64.urlsafe_b64encode(hashlib.md5(uri.encode('utf-8')).digest()).rstrip(b'=\n').decode('utf-8') def _get_initial_path(self): - settings = SIPSimpleSettings() - return os.path.join(settings.file_transfer.directory.normalized, self.prefix, self.hashed_sender, self.hashed_receiver, self.transfer_id) + return os.path.join(GeneralConfig.file_transfer_dir.normalized, self.prefix, self.hashed_sender, self.hashed_receiver, self.transfer_id) def update_path_for_receiver(self): - settings = SIPSimpleSettings() self.prefix = self.hashed_receiver[:1] - self.path = os.path.join(settings.file_transfer.directory.normalized, self.prefix, self.hashed_receiver, self.hashed_sender, self.transfer_id) + self.path = os.path.join(GeneralConfig.file_transfer_dir.normalized, self.prefix, self.hashed_receiver, self.hashed_sender, self.transfer_id) self.url = self._set_url() def _set_until(self): remove_after_days = GeneralConfig.filetransfer_expire_days return str(ISOTimestamp(datetime.now() + timedelta(days=remove_after_days))) def _set_url(self): - settings = SIPSimpleSettings() - stripped_path = os.path.relpath(self.path, f'{settings.file_transfer.directory.normalized}/{self.prefix}') + stripped_path = os.path.relpath(self.path, f'{GeneralConfig.file_transfer_dir.normalized}/{self.prefix}') file_path = urllib.parse.quote(f'webrtcgateway/filetransfer/{stripped_path}/{self.filename}') return str(URL(f'{server.url}/{file_path}')) @property def full_path(self): return os.path.join(self.path, self.filename) @property def hashed_sender(self): return self._hashed_sender @hashed_sender.setter def hashed_sender(self, value): self._hashed_sender = self._encode_and_hash_uri(value) @property def hashed_receiver(self): return self._hashed_receiver @hashed_receiver.setter def hashed_receiver(self, value): self._hashed_receiver = self._encode_and_hash_uri(value) @property def message_payload(self): return f'File transfer available at {self.url} ({self.formatted_file_size})' def cpim_message_payload(self, metadata): return self.build_cpim_payload(self.sender.uri, self.receiver.uri, self.transfer_id, json.dumps(sylkrtc.FileTransferMessage(**metadata.__data__).__data__), content_type='application/sylk-file-transfer') def cpim_rcsfthttp_message_payload(self, metadata): return self.build_cpim_payload(self.sender.uri, self.receiver.uri, self.transfer_id, FTHTTPDocument.create(file=[FileInfo(file_size=metadata.filesize, file_name=metadata.filename, content_type=metadata.filetype, url=metadata.url, until=metadata.until, hash=metadata.hash)]), content_type=FTHTTPDocument.content_type) @property def formatted_file_size(self): return self.format_file_size(self.filesize) @staticmethod def build_cpim_payload(account, uri, message_id, content, content_type='text/plain'): ns = CPIMNamespace('urn:ietf:params:imdn', 'imdn') additional_headers = [CPIMHeader('Message-ID', ns, message_id)] additional_headers.append(CPIMHeader('Disposition-Notification', ns, 'positive-delivery, display')) payload = CPIMPayload(content, content_type, charset='utf-8', sender=ChatIdentity(account, None), recipients=[ChatIdentity(uri, None)], timestamp=str(ISOTimestamp.now().replace(microsecond=0)), additional_headers=additional_headers) payload, content_type = payload.encode() return payload @staticmethod def format_file_size(size): infinite = float('infinity') boundaries = [( 1024, '%d bytes', 1), ( 10*1024, '%.2f KB', 1024.0), ( 1024*1024, '%.1f KB', 1024.0), ( 10*1024*1024, '%.2f MB', 1024*1024.0), (1024*1024*1024, '%.1f MB', 1024*1024.0), (10*1024*1024*1024, '%.2f GB', 1024*1024*1024.0), ( infinite, '%.1f GB', 1024*1024*1024.0)] for boundary, format, divisor in boundaries: if size < boundary: return format % (size/divisor,) else: return "%d bytes" % size diff --git a/sylk/applications/webrtcgateway/storage.py b/sylk/applications/webrtcgateway/storage.py index cdd06ed..9caa842 100644 --- a/sylk/applications/webrtcgateway/storage.py +++ b/sylk/applications/webrtcgateway/storage.py @@ -1,766 +1,766 @@ import datetime import json import pickle as pickle import os from application.python.types import Singleton from application.system import makedirs from collections import defaultdict from sipsimple.threading import run_in_thread from sipsimple.util import ISOTimestamp from shutil import rmtree from twisted.internet import defer from types import SimpleNamespace from sylk.configuration import ServerConfig -from .configuration import CassandraConfig +from .configuration import CassandraConfig, FileStorageConfig from .datatypes import FileTransferData from .errors import StorageError from .logger import log __all__ = 'TokenStorage', # TODO: Maybe add some more metadata like the modification date so we know when a token was refreshed, # and thus it's ok to scrap it after a reasonable amount of time. CASSANDRA_MODULES_AVAILABLE = False try: from cassandra.cqlengine import columns, connection except ImportError: pass else: try: from cassandra.cqlengine.models import Model except ImportError: pass else: CASSANDRA_MODULES_AVAILABLE = True from cassandra import InvalidRequest from cassandra.cqlengine import CQLEngineException from cassandra.cqlengine.query import LWTException from cassandra.cluster import NoHostAvailable from cassandra.policies import DCAwareRoundRobinPolicy from .models.storage.cassandra import PushTokens from .models.storage.cassandra import ChatAccount, ChatMessage, ChatMessageIdMapping, PublicKey if CassandraConfig.push_tokens_table: PushTokens.__table_name__ = CassandraConfig.push_tokens_table class FileTokenStorage(object): def __init__(self): self._tokens = defaultdict() @run_in_thread('file-io') def _save(self): - with open(os.path.join(ServerConfig.spool_dir, 'webrtc_device_tokens'), 'wb+') as f: + with open(os.path.join(FileStorageConfig.storage_dir, 'webrtc_device_tokens'), 'wb+') as f: pickle.dump(self._tokens, f) @run_in_thread('file-io') def load(self): try: - tokens = pickle.load(open(os.path.join(ServerConfig.spool_dir, 'webrtc_device_tokens'), 'rb')) + tokens = pickle.load(open(os.path.join(FileStorageConfig.storage_dir, 'webrtc_device_tokens'), 'rb')) except Exception: pass else: self._tokens.update(tokens) def __getitem__(self, key): try: return self._tokens[key] except KeyError: return {} def add(self, account, contact_params, user_agent): try: (token, background_token) = contact_params['pn_tok'].split('-') except ValueError: token = contact_params['pn_tok'] background_token = None data = { 'device_id': contact_params['pn_device'], 'platform': contact_params['pn_type'], 'silent': contact_params['pn_silent'], 'app_id': contact_params['pn_app'], 'user_agent': user_agent, 'background_token': background_token, 'device_token': token } key = f"{data['app_id']}-{data['device_id']}" if account in self._tokens: if isinstance(self._tokens[account], set): self._tokens[account] = {} # Remove old storage layout based on device id if contact_params['pn_device'] in self._tokens[account]: del self._tokens[account][contact_params['pn_device']] # Remove old storage layout based on token if token in self._tokens[account]: del self._tokens[account][token] # Remove old unsplit token if exists, can be removed if all tokens are stored split if background_token is not None: try: del self._tokens[account][contact_params['pn_tok']] except (IndexError, KeyError): pass self._tokens[account][key] = data else: self._tokens[account] = {key: data} self._save() def remove(self, account, app_id, device_id): key = f'{app_id}-{device_id}' try: del self._tokens[account][key] except KeyError: pass self._save() def removeAll(self, account): try: del self._tokens[account] except KeyError: pass self._save() class CassandraConnection(object, metaclass=Singleton): @run_in_thread('cassandra') def __init__(self): try: from cassandra.io import twistedreactor self.session = connection.setup(CassandraConfig.cluster_contact_points, CassandraConfig.keyspace, load_balancing_policy=DCAwareRoundRobinPolicy(), protocol_version=4, connection_class=twistedreactor.TwistedConnection) except NoHostAvailable: log.error("Not able to connect to any of the Cassandra contact points") raise StorageError class CassandraTokenStorage(object): @run_in_thread('cassandra') def load(self): CassandraConnection() def __getitem__(self, key): deferred = defer.Deferred() @run_in_thread('cassandra') def query_tokens(key): username, domain = key.split('@', 1) tokens = {} for device in PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain): tokens[f'{device.app_id}-{device.device_id}'] = {'device_id': device.device_id, 'token': device.device_token, 'platform': device.platform, 'silent': device.silent, 'app_id': device.app_id, 'background_token': device.background_token} deferred.callback(tokens) return tokens query_tokens(key) return deferred @run_in_thread('cassandra') def add(self, account, contact_params, user_agent): username, domain = account.split('@', 1) token = contact_params['pn_tok'] background_token = None if contact_params['pn_type'] == 'ios': try: (token, background_token) = contact_params['pn_tok'].split('-') except ValueError: pass # Remove old unsplit token if exists, can be removed if all tokens are stored split if background_token is not None: try: old_token = PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain)[0] except (IndexError, LWTException): pass else: if old_token.device_token == contact_params['pn_tok']: old_token.delete() try: PushTokens.create(username=username, domain=domain, device_id=contact_params['pn_device'], device_token=token, background_token=background_token, platform=contact_params['pn_type'], silent=contact_params['pn_silent'], app_id=contact_params['pn_app'], user_agent=user_agent) except (CQLEngineException, InvalidRequest) as e: log.error(f'Storing token failed: {e}') raise StorageError @run_in_thread('cassandra') def remove(self, account, app_id, device_id): username, domain = account.split('@', 1) try: PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain, PushTokens.device_id == device_id, PushTokens.app_id == app_id).if_exists().delete() except LWTException: pass class FileMessageStorage(object): def __init__(self): self._public_keys = defaultdict() self._accounts = defaultdict() - self._storage_path = os.path.join(ServerConfig.spool_dir, 'conversations') + self._storage_path = os.path.join(FileStorageConfig.storage_dir, 'conversations') def _json_dateconverter(self, o): if isinstance(o, datetime.datetime): return o.__str__() @run_in_thread('file-io') def _save(self): with open(os.path.join(self._storage_path, 'accounts.json'), 'w+') as f: json.dump(self._accounts, f, default=self._json_dateconverter) with open(os.path.join(self._storage_path, 'public_keys.json'), 'w+') as f: json.dump(self._public_keys, f, default=self._json_dateconverter) def _save_messages(self, account, messages): with open(os.path.join(self._storage_path, account[0], f'{account}_messages.json'), 'w+') as f: json.dump(messages, f, default=self._json_dateconverter, indent=4) def _save_id_by_timestamp(self, account, ids): with open(os.path.join(self._storage_path, account[0], f'{account}_id_timestamp.json'), 'w+') as f: json.dump(ids, f, default=self._json_dateconverter) @run_in_thread('file-io') def load(self): makedirs(self._storage_path) try: accounts = json.load(open(os.path.join(self._storage_path, 'accounts.json'), 'r')) except (OSError, IOError): pass else: self._accounts.update(accounts) try: public_keys = json.load(open(os.path.join(self._storage_path, 'public_keys.json'), 'r')) except (OSError, IOError): pass else: self._public_keys.update(public_keys) def _load_id_by_timestamp(self, account): try: with open(os.path.join(self._storage_path, account[0], f'{account}_id_timestamp.json'), 'r') as f: data = json.load(f) return data except (OSError, IOError) as e: raise e def _load_messages(self, account): try: with open(os.path.join(self._storage_path, account[0], f'{account}_messages.json'), 'r') as f: messages = json.load(f) return messages except (OSError, IOError) as e: raise e def __getitem__(self, key): deferred = defer.Deferred() @run_in_thread('file-io') def query(account, message_id, since): messages = [] timestamp = None try: id_by_timestamp = self._load_id_by_timestamp(account) except (OSError, IOError): deferred.callback(messages) return if message_id is not None: try: timestamp = id_by_timestamp[message_id] except KeyError: deferred.callback(messages) return else: timestamp = ISOTimestamp(timestamp) if since and not message_id: timestamp = ISOTimestamp(since) try: messages = self._load_messages(account) except (OSError, IOError): deferred.callback(messages) return else: if timestamp is not None: messages = [message for message in messages if ISOTimestamp(message['created_at']) > timestamp] deferred.callback(messages) else: deferred.callback(messages) try: since = key[2] except IndexError: since = None query(key[0], key[1], since) return deferred def get_account(self, account): try: return SimpleNamespace(account=account, **self._accounts[account]) except KeyError: return None def get_account_token(self, account): try: if datetime.datetime.now() < datetime.datetime.fromisoformat(str(self._accounts[account]['token_expire'])): return self._accounts[account]['api_token'] return None except KeyError: return None def remove_account_token(self, account): if account in self._accounts: self._accounts[account]['api_token'] = '' self._save() def add_account(self, account): timestamp = datetime.datetime.now() if account not in self._accounts: self._accounts[account] = {'last_login': timestamp} self._save() else: self._accounts[account]['last_login'] = timestamp self._save() def get_public_key(self, account): try: return self._public_keys[account]['content'] except KeyError: return None def remove_account(self, account): if account in self._accounts: del self._accounts[account] self._save() def remove_public_key(self, account): if account in self._public_keys: del self.public_keys[account] self._save() def add_account_token(self, account, token): timestamp = datetime.datetime.now() if account not in self._accounts: log.error(f'Updating API token for {account} failed') return StorageError self._accounts[account]['api_token'] = token self._accounts[account]['token_expire'] = timestamp + datetime.timedelta(seconds=26784000) self._save() @run_in_thread('file-io') def mark_conversation_read(self, account, contact): try: messages = self._load_messages(account) except (OSError, IOError): return else: for idx, message in enumerate(messages): if message['contact'] == contact: message['disposition'] = [] messages[idx] = message self._save_messages(account, messages) @run_in_thread('file-io') def update(self, account, state, message_id): try: messages = self._load_messages(account) except (OSError, IOError): return try: id_by_timestamp = self._load_id_by_timestamp(account) except (OSError, IOError): return else: try: timestamp = id_by_timestamp[message_id] except KeyError: return else: for idx, message in enumerate(messages): if message['created_at'] == timestamp and message['message_id'] == message_id: if message['state'] != 'received': message['state'] = state if state == 'delivered': try: message['disposition'].remove('positive-delivery') except ValueError: pass elif state == 'displayed': message['disposition'] = [] messages[idx] = message self._save_messages(account, messages) break @run_in_thread('file-io') def add(self, account, contact, direction, content, content_type, timestamp, disposition_notification, message_id, state=None): try: msg_timestamp = datetime.datetime.fromisoformat(timestamp) except ValueError: msg_timestamp = datetime.datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%f%z") timestamp = datetime.datetime.now() messages = [] id_by_timestamp = {} try: messages = self._load_messages(account) except (OSError, IOError): makedirs(os.path.join(self._storage_path, account[0])) timestamp_not_found = True try: id_by_timestamp = self._load_id_by_timestamp(account) except (OSError, IOError): pass else: try: created_at = id_by_timestamp[message_id] except KeyError: pass else: timestamp_not_found = False if content_type == 'message/imdn+json': return items = [n for n in messages if n['created_at'] == created_at and n['message_id'] == message_id] if len(items) == 1: return if content_type == 'text/pgp-public-key': key = account if direction == "outgoing" else contact self._public_keys[key] = {'content': content, 'created_at': timestamp} self._save() return if content_type == 'text/pgp-private-key': if timestamp_not_found: id_by_timestamp[message_id] = timestamp self._save_id_by_timestamp(account, id_by_timestamp) return if not isinstance(disposition_notification, list) and disposition_notification == '': disposition_notification = [] message = {'account': account, 'direction': direction, 'contact': contact, 'content_type': content_type, 'content': content, 'created_at': timestamp, 'message_id': message_id, 'disposition': disposition_notification, 'state': state, 'msg_timestamp': msg_timestamp} messages.append(message) self._save_messages(account, messages) if timestamp_not_found: id_by_timestamp[message_id] = timestamp self._save_id_by_timestamp(account, id_by_timestamp) @run_in_thread('file-io') def removeChat(self, account, contact): try: messages = self._load_messages(account) except (OSError, IOError): pass else: messages[:] = [message for message in messages if message['contact'] != contact] self._save_messages(account, messages) @run_in_thread('file-io') def removeMessage(self, account, message_id): try: id_by_timestamp = self._load_id_by_timestamp(account) except (OSError, IOError): return False else: try: timestamp = id_by_timestamp[message_id] except KeyError: return False else: try: messages = self._load_messages(account) except (OSError, IOError): return else: item = [n for n in messages if n['created_at'] == timestamp and n['message_id'] == message_id] if len(item) == 1: if item[0]['content_type'] == 'application/sylk-file-transfer': data = FileTransferData('', '', '', item[0]['message_id'], item[0]['account'], item[0]['contact']) try: rmtree(data.path) except FileNotFoundError: pass else: print(f"Removed {data.path}") messages.remove(item[0]) self._save_messages(account, messages) return True return False class CassandraMessageStorage(object): @run_in_thread('cassandra') def load(self): CassandraConnection() def __getitem__(self, key): deferred = defer.Deferred() @run_in_thread('cassandra') def query_messages(key, message_id, since): messages = [] try: timestamp = ChatMessageIdMapping.objects(ChatMessageIdMapping.message_id == message_id)[0] except (IndexError, InvalidRequest): timestamp = datetime.datetime.now() - datetime.timedelta(days=3) else: timestamp = timestamp.created_at if since and not message_id: timestamp = ISOTimestamp(since) for message in ChatMessage.objects(ChatMessage.account == key, ChatMessage.created_at > timestamp): timestamp_naive = message.msg_timestamp try: timestamp_utc = timestamp_naive.replace(tzinfo=datetime.timezone.utc) except AttributeError: continue message.msg_timestamp = timestamp_utc messages.append(message) deferred.callback(messages) try: since = key[2] except IndexError: since = None query_messages(key[0], key[1], since) return deferred def get_account(self, account): deferred = defer.Deferred() @run_in_thread('cassandra') def query_tokens(account): try: chat_account = ChatAccount.objects(ChatAccount.account == account)[0] except (IndexError, InvalidRequest): deferred.callback(None) else: deferred.callback(chat_account) query_tokens(account) return deferred def get_account_token(self, account): deferred = defer.Deferred() @run_in_thread('cassandra') def query_tokens(account): try: chat_account = ChatAccount.objects(ChatAccount.account == account)[0] except (IndexError, InvalidRequest): deferred.callback(None) else: deferred.callback(chat_account.api_token) query_tokens(account) return deferred @run_in_thread('cassandra') def add_account(self, account): timestamp = datetime.datetime.now() try: ChatAccount.create(account=account, last_login=timestamp) except (CQLEngineException, InvalidRequest) as e: log.error(f'Storing account failed: {e}') raise StorageError @run_in_thread('cassandra') def add_account_token(self, account, token): try: chat_account = ChatAccount.objects(account=account)[0] chat_account.ttl(2678400).update(api_token=token) except IndexError: log.error(f'Updating API token for {account} failed') raise StorageError @run_in_thread('cassandra') def mark_conversation_read(self, account, contact): for message in ChatMessage.objects(ChatMessage.account == account): if message.contact == contact: if message.content_type == 'application/sylk-conversation-read': message.delete() else: message.disposition.clear() message.save() def get_public_key(self, account): deferred = defer.Deferred() @run_in_thread('cassandra') def query_key(account): try: public_key = PublicKey.objects(PublicKey.account == account)[0] except (IndexError, InvalidRequest): deferred.callback(None) else: deferred.callback(public_key.public_key) query_key(account) return deferred @run_in_thread('cassandra') def update(self, account, state, message_id): try: timestamp = ChatMessageIdMapping.objects(ChatMessageIdMapping.message_id == message_id)[0] except IndexError: return else: try: message = ChatMessage.objects(ChatMessage.account == account, ChatMessage.created_at == timestamp.created_at, ChatMessage.message_id == message_id)[0] except IndexError: pass else: if message.state != 'received': message.state = state if state == 'delivered': try: message.disposition.remove('positive-delivery') except ValueError: pass elif state == 'displayed': message.disposition.clear() message.save() @run_in_thread('cassandra') def add(self, account, contact, direction, content, content_type, timestamp, disposition_notification, message_id, state=None): try: msg_timestamp = datetime.datetime.fromisoformat(timestamp) except ValueError: msg_timestamp = datetime.datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%f%z") timestamp = datetime.datetime.now() timestamp_not_found = True try: created_at = ChatMessageIdMapping.objects(ChatMessageIdMapping.message_id == message_id)[0] except IndexError: pass else: timestamp_not_found = False timestamp = created_at.created_at if content_type == 'message/imdn+json': return if ChatMessage.objects(ChatMessage.account == account, ChatMessage.created_at == created_at.created_at, ChatMessage.message_id == message_id).count() != 0: return if content_type == 'text/pgp-public-key': try: PublicKey.create(account=account if direction == "outgoing" else contact, public_key=content, created_at=timestamp) ChatMessageIdMapping.create(created_at=timestamp, message_id=message_id) except (CQLEngineException, InvalidRequest) as e: log.error(f'Storing public key failed: {e}') raise StorageError else: return if content_type == 'text/pgp-private-key': if timestamp_not_found: try: ChatMessageIdMapping.create(created_at=timestamp, message_id=message_id) except (CQLEngineException, InvalidRequest): pass return try: ChatMessage.create(account=account, direction=direction, contact=contact, content_type=content_type, content=content, created_at=timestamp, message_id=message_id, disposition=disposition_notification, state=state, msg_timestamp=msg_timestamp) if timestamp_not_found: ChatMessageIdMapping.create(created_at=timestamp, message_id=message_id) except (CQLEngineException, InvalidRequest) as e: log.error(f'Storing message failed: {e}') raise StorageError @run_in_thread('cassandra') def removeChat(self, account, contact): try: messages = ChatMessage.objects(ChatMessage.account == account) except LWTException: pass else: for message in messages: if message.contact == contact: message.delete() def removeMessage(self, account, message_id): deferred = defer.Deferred() @run_in_thread('cassandra') def remove(account, message_id): try: timestamp = ChatMessageIdMapping.objects(ChatMessageIdMapping.message_id == message_id)[0] except IndexError: deferred.callback(False) return else: try: result = True messages = ChatMessage.objects(ChatMessage.account == account, ChatMessage.created_at == timestamp.created_at, # ChatMessage.message_id == message_id).if_exists().delete() ChatMessage.message_id == message_id) for message in messages: if message.content_type == 'application/sylk-file-transfer': data = FileTransferData('', '', '', message.message_id, message.account, message.contact) try: rmtree(data.path) except FileNotFoundError: pass else: print(f"Removed {data.path}") if not messages: result = False print('message not removed') else: print('message removed') messages.if_exists().delete() deferred.callback(result) except LWTException: pass remove(account, message_id) return deferred class TokenStorage(object, metaclass=Singleton): def __new__(self): if CASSANDRA_MODULES_AVAILABLE and CassandraConfig.cluster_contact_points: return CassandraTokenStorage() else: return FileTokenStorage() class MessageStorage(object, metaclass=Singleton): def __new__(self): if CASSANDRA_MODULES_AVAILABLE and CassandraConfig.cluster_contact_points: return CassandraMessageStorage() else: return FileMessageStorage() diff --git a/sylk/applications/webrtcgateway/web.py b/sylk/applications/webrtcgateway/web.py index 98c6606..44c6315 100644 --- a/sylk/applications/webrtcgateway/web.py +++ b/sylk/applications/webrtcgateway/web.py @@ -1,374 +1,372 @@ import json import os import hashlib from shutil import copyfileobj from application.system import makedirs from sipsimple.streams.msrp.filetransfer import FileSelector from application.python.types import Singleton from autobahn.twisted.resource import WebSocketResource -from sipsimple.configuration.settings import SIPSimpleSettings from twisted.internet import defer, reactor from twisted.python.failure import Failure from twisted.web.server import Site from werkzeug.exceptions import Forbidden, NotFound from werkzeug.utils import secure_filename from sylk import __version__ as sylk_version from sylk.resources import Resources from sylk.web import File, Klein, StaticFileResource, server from .configuration import GeneralConfig, JanusConfig from .datatypes import FileTransferData from .factory import SylkWebSocketServerFactory from .janus import JanusBackend from .logger import log from .models import sylkrtc from .protocol import SYLK_WS_PROTOCOL from .sip_handlers import MessageHandler from .storage import TokenStorage, MessageStorage __all__ = 'WebHandler', 'AdminWebHandler' class FileUploadRequest(object): def __init__(self, shared_file, content): self.deferred = defer.Deferred() self.shared_file = shared_file self.content = content self.had_error = False class ApiTokenAuthError(Exception): pass class WebRTCGatewayWeb(object, metaclass=Singleton): app = Klein() def __init__(self, ws_factory): self._resource = self.app.resource() self._ws_resource = WebSocketResource(ws_factory) self._ws_factory = ws_factory @property def resource(self): return self._resource @app.route('/', branch=True) def index(self, request): return StaticFileResource(Resources.get('html/webrtcgateway/')) @app.route('/ws') def ws(self, request): return self._ws_resource @app.route('/filesharing///', methods=['OPTIONS', 'POST', 'GET']) def filesharing(self, request, conference, session_id, filename): conference_uri = conference.lower() if conference_uri in self._ws_factory.videorooms: videoroom = self._ws_factory.videorooms[conference_uri] if session_id in videoroom: request.setHeader('Access-Control-Allow-Origin', '*') request.setHeader('Access-Control-Allow-Headers', 'content-type') method = request.method.upper().decode() session = videoroom[session_id] if method == 'POST': def log_result(result): if isinstance(result, Failure): videoroom.log.warning('{file.uploader.uri} failed to upload {file.filename}: {error}'.format(file=upload_request.shared_file, error=result.value)) else: videoroom.log.info('{file.uploader.uri} has uploaded {file.filename}'.format(file=upload_request.shared_file)) return result filename = secure_filename(filename) filesize = int(request.getHeader('Content-Length')) shared_file = sylkrtc.SharedFile(filename=filename, filesize=filesize, uploader=dict(uri=session.account.id, display_name=session.account.display_name), session=session_id) session.owner.log.info('wants to upload file {filename} to video room {conference_uri} with session {session_id}'.format(filename=filename, conference_uri=conference_uri, session_id=session_id)) upload_request = FileUploadRequest(shared_file, request.content) videoroom.add_file(upload_request) upload_request.deferred.addBoth(log_result) return upload_request.deferred elif method == 'GET': filename = secure_filename(filename) session.owner.log.info('wants to download file {filename} from video room {conference_uri} with session {session_id}'.format(filename=filename, conference_uri=conference_uri, session_id=session_id)) try: path = videoroom.get_file(filename) except LookupError as e: videoroom.log.warning('{session.account.id} failed to download {filename}: {error}'.format(session=session, filename=filename, error=e)) raise NotFound() else: videoroom.log.info('{session.account.id} is downloading {filename}'.format(session=session, filename=filename)) request.setHeader('Content-Disposition', 'attachment;filename=%s' % filename) return File(path) else: return 'OK' raise Forbidden() @app.route('/filetransfer////', methods=['GET', 'POST', 'OPTIONS']) def filetransfer(self, request, sender, receiver, transfer_id, filename): request.setHeader('Access-Control-Allow-Origin', '*') request.setHeader('Access-Control-Allow-Headers', 'content-type') method = request.method.upper().decode() if method == 'POST': ip = request.getClientIP() connection_handlers = [connection.connection_handler for connection in self._ws_factory.connections if connection.peer.split(":")[1] == ip] sender_connection = next((connection_handler for connection_handler in connection_handlers if sender in connection_handler.accounts_map), False) if not sender_connection: raise Forbidden # TODO: Form support to support extra metadata? filename = secure_filename(filename) filesize = int(request.getHeader('Content-Length')) filetype = request.getHeader('Content-Type') if request.getHeader('Content-Type') else 'application/octet-stream' transfer_data = FileTransferData(filename, filesize, filetype, transfer_id, sender, receiver, content=request.content) message_storage = MessageStorage() account = defer.maybeDeferred(message_storage.get_account, receiver) account.addCallback(lambda result: self._check_receiver(result)) sender_account = defer.maybeDeferred(message_storage.get_account, sender) sender_account.addCallback(lambda result: self._check_sender(result, transfer_data)) d1 = defer.DeferredList([account, sender_account], consumeErrors=True) d1.addCallback(lambda result: self._handle_lookup_result(result, transfer_data, sender_connection)) return d1 elif method == 'GET': - settings = SIPSimpleSettings() - folder = os.path.join(settings.file_transfer.directory.normalized, sender[:1], sender, receiver, transfer_id) + folder = os.path.join(GeneralConfig.file_transfer_dir.normalized, sender[:1], sender, receiver, transfer_id) path = f'{folder}/{filename}' log_path = os.path.join(sender, receiver, transfer_id, filename) if os.path.exists(path): file_size = os.path.getsize(path) split_tup = os.path.splitext(path) file_extension = split_tup[1] render_type = 'inline' if file_extension and file_extension.lower() in ('.jpg', '.png', '.jpeg', '.gif') else 'attachment' request.setHeader('Content-Disposition', '%s;filename=%s' % (render_type, filename)) log.info('Web %s file download %s (%s)' % (render_type, log_path, FileTransferData.format_file_size(file_size))) return File(path) else: log.warning('Download failed, file not found: %s' % (log_path)) raise NotFound() else: return 'OK' def _check_receiver(self, account): if account is None: raise Exception("Receiver account for file upload not found") def _check_sender(self, account, transfer_data): if account is None: transfer_data.update_path_for_receiver() raise Exception("Sender account for file upload not found") def _handle_lookup_result(self, result, transfer_data, connection): reject_session = all([success is not True for (success, value) in result]) if reject_session: self._reject_upload("Sender and receiver accounts for file upload were not found") return log.info('File upload from {sender.uri} to {receiver.uri} will be saved to {path}/{filename}'.format(**transfer_data.__dict__)) return self._accept_upload(transfer_data, connection) def _reject_upload(self, error): log.warning(f'File upload rejected: {error}') raise NotFound() def _accept_upload(self, transfer_data, connection): makedirs(transfer_data.path) with open(os.path.join(transfer_data.path, transfer_data.filename), 'wb') as output_file: copyfileobj(transfer_data.content, output_file) part_size = 64 * 1024 sha1 = hashlib.sha1() with open(os.path.join(transfer_data.path, transfer_data.filename), 'rb') as f: while True: data = f.read(part_size) if not data: break sha1.update(data) file_selector = FileSelector.for_file(os.path.join(transfer_data.path, transfer_data.filename)) file_selector.hash = sha1 metadata = sylkrtc.TransferredFile(**transfer_data.__dict__, hash=file_selector.hash) meta_filepath = os.path.join(transfer_data.path, f'meta-{metadata.filename}') try: with open(meta_filepath, 'w+') as output_file: output_file.write(json.dumps(metadata.__data__)) except (OSError, IOError): log.warning('Could not save metadata %s' % meta_filepath) message_handler = MessageHandler() payload = transfer_data.cpim_message_payload(metadata) message_handler.outgoing_message_to_self(f'sip:{metadata.receiver.uri}', payload, content_type='message/cpim', identity=f'sip:{metadata.sender.uri}') xml_payload = transfer_data.cpim_rcsfthttp_message_payload(metadata) message_handler.outgoing_replicated_message(f'sip:{metadata.receiver.uri}', xml_payload, content_type='message/cpim', identity=f'sip:{metadata.sender.uri}') message_handler.outgoing_message(f'sip:{metadata.receiver.uri}', xml_payload, content_type='message/cpim', identity=f'sip:{metadata.sender.uri}') if not metadata.filename.endswith('.asc'): message_handler.outgoing_replicated_message(f'sip:{metadata.receiver.uri}', transfer_data.message_payload, content_type='text/plain', identity=f'sip:{metadata.sender.uri}') message_handler.outgoing_message(f'sip:{metadata.receiver.uri}', transfer_data.message_payload, content_type='text/plain', identity=f'sip:{metadata.sender.uri}') return "OK" def verify_api_token(self, request, account, msg_id, token=None): # print(msg_id) # return self.get_account_messages(request, account) if token: auth_headers = request.requestHeaders.getRawHeaders('Authorization', default=None) if auth_headers: try: method, auth_token = auth_headers[0].split() except ValueError: log.warning(f'Authorization headers is not correct for message history request for {account}, it should be in the format: Apikey [TOKEN]') else: log.warning(f'Authorization headers missing on message history request for {account}') if not auth_headers or method != 'Apikey' or auth_token != token: log.warning(f'Token authentication error for {account}') raise ApiTokenAuthError() else: log.info(f'Returning message history for {account}') return self.get_account_messages(request, account, msg_id) else: log.warning(f'Token not found for {account}') raise ApiTokenAuthError() def tokenError(self, error, request): raise ApiTokenAuthError() def get_account_messages(self, request, account, msg_id=None): log.info(f'Returning message history for {account}') account = account.lower() storage = MessageStorage() messages = storage[[account, msg_id]] request.setHeader('Content-Type', 'application/json') if isinstance(messages, defer.Deferred): return messages.addCallback(lambda result: json.dumps(sylkrtc.MessageHistoryData(account=account, messages=result).__data__)) @app.handle_errors(ApiTokenAuthError) def auth_error(self, request, failure): request.setResponseCode(401) return b'Unauthorized' @app.route('/messages/history/', methods=['OPTIONS', 'GET']) @app.route('/messages/history//', methods=['OPTIONS', 'GET']) def messages(self, request, account, msg_id=None): storage = MessageStorage() token = storage.get_account_token(account) if isinstance(token, defer.Deferred): token.addCallback(lambda result: self.verify_api_token(request, account, msg_id, result)) return token else: return self.verify_api_token(request, account, msg_id, token) class WebHandler(object): def __init__(self): self.backend = None self.factory = None self.resource = None self.web = None def start(self): ws_url = 'ws' + server.url[4:] + '/webrtcgateway/ws' self.factory = SylkWebSocketServerFactory(ws_url, protocols=[SYLK_WS_PROTOCOL], server='SylkServer/%s' % sylk_version) self.factory.setProtocolOptions(allowedOrigins=GeneralConfig.web_origins, allowNullOrigin=GeneralConfig.web_origins == ['*'], autoPingInterval=GeneralConfig.websocket_ping_interval, autoPingTimeout=GeneralConfig.websocket_ping_interval/2) self.web = WebRTCGatewayWeb(self.factory) server.register_resource(b'webrtcgateway', self.web.resource) log.info('WebSocket handler started at %s' % ws_url) log.info('Allowed web origins: %s' % ', '.join(GeneralConfig.web_origins)) log.info('Allowed SIP domains: %s' % ', '.join(GeneralConfig.sip_domains)) log.info('Using Janus API: %s' % JanusConfig.api_url) self.backend = JanusBackend() self.backend.start() def stop(self): if self.factory is not None: for conn in self.factory.connections.copy(): conn.dropConnection(abort=True) self.factory = None if self.backend is not None: self.backend.stop() self.backend = None # TODO: This implementation is a prototype. Moving forward it probably makes sense to provide admin API # capabilities for other applications too. This could be done in a number of ways: # # * On the main web server, under a /admin/ parent route. # * On a separate web server, which could listen on a different IP and port. # # In either case, HTTPS aside, a token based authentication mechanism would be desired. # Which one is best is not 100% clear at this point. class AuthError(Exception): pass class AdminWebHandler(object, metaclass=Singleton): app = Klein() def __init__(self): self.listener = None def start(self): host, port = GeneralConfig.http_management_interface # noinspection PyUnresolvedReferences self.listener = reactor.listenTCP(port, Site(self.app.resource()), interface=host) log.info('Admin web handler started at http://%s:%d' % (host, port)) def stop(self): if self.listener is not None: self.listener.stopListening() self.listener = None # Admin web API def _check_auth(self, request): auth_secret = GeneralConfig.http_management_auth_secret if auth_secret: auth_headers = request.requestHeaders.getRawHeaders('Authorization', default=None) if not auth_headers or auth_headers[0] != auth_secret: raise AuthError() @app.handle_errors(AuthError) def auth_error(self, request, failure): request.setResponseCode(403) return 'Authentication error' @app.route('/tokens/') def get_tokens(self, request, account): self._check_auth(request) request.setHeader('Content-Type', 'application/json') storage = TokenStorage() tokens = storage[account] if isinstance(tokens, defer.Deferred): return tokens.addCallback(lambda result: json.dumps({'tokens': result})) else: return json.dumps({'tokens': tokens}) @app.route('/tokens//', methods=['DELETE']) def process_token(self, request, account, device_token): self._check_auth(request) request.setHeader('Content-Type', 'application/json') storage = TokenStorage() if request.method == 'DELETE': storage.remove(account, device_token) return json.dumps({'success': True}) diff --git a/webrtcgateway.ini.sample b/webrtcgateway.ini.sample index 024ef9d..f2fa814 100644 --- a/webrtcgateway.ini.sample +++ b/webrtcgateway.ini.sample @@ -1,101 +1,105 @@ ; SylkServer WebRTC gateway configuration file ; ; For the gateway to work Janus needs to be properly installed and configured, ; please refer to README.webrtc for detailed instructions ; ; [WebServer] section in config.ini must be properly configured for using ; HTTPS transport [General] ; List of allowed web origins. The connection origin (Origin header in the ; HTTP request) will be checked against the list defined here, if the domain ; is no allowed the connection will be refused. ; * (the default) means any ; web_origins = * ; Proxy used for outbound SIP traffic ; outbound_sip_proxy = ; List of allowed SIP domains for managing accounts ; sip_domains = * ; Boolean indicating if the WebSocket messages sent to/from clients should be ; logged to a file ; trace_client = False ; WebSocket Ping frames are sent at the configured interval, this helps detect ; dead client connections ; websocket_ping_interval = 120 ; IP and port for the HTTP management interface as IP[:PORT] ; http_management_interface = 127.0.0.1:20888 ; Shared secret for the HTTP management interface (Authorization: THE_KEY) ; http_management_auth_secret = ; Sylk-push URL to send conference push notification ; sylk_push_url = ; Send sip messages only to ourself, skipping any outbound destination. ; This setting should be enabled if you have no SIP proxy, no outbound proxy ; configured and a different authentication method. You should also add the ; listening IP to the trusted list in the main configuration ; local_sip_messages = +; Application directory. This set the path where the application stores non +; temporary files +; application_directory = /var/lib/sylkserver/webrtcgateway + ; Remove big transfered files (> 50MB) after this amount of days. Smaller ; files (< 50MB) will be removed after 2 * filetransfer_expire_days ; filetransfer_expire_days = 15 [Janus] ; URL pointing to the Janus API endpoint (only WebSocket is supported) ; api_url = ws://127.0.0.1:8188 ; API secret shared with Janus (must match the value in janus.cfg) ; A random UUID value is recommended, a new value can be generated with ; the following command: ; python -c 'import uuid; print(uuid.uuid4().hex)' api_secret = 0745f2f74f34451c89343afcdcae5809 ; Boolean indicating if the messages between SylkServer and Janus should be ; logged to a file ; trace_janus = False ; Maximum video bitrate allowed per sender in a room in bits/s. This value is ; applied to any room that doesn't define its own. The value is any integer ; number between 64000 and 4194304. Default value is 2016000 (~2Mb/s). ; max_bitrate = 2016000 ; The video codec to be used by all participants in a room. This value is ; applied to any room that doesn't define its own. ; Possible values are: h264, vp8 and vp9. Default is vp9. ; video_codec = vp9 ; code used to decline the calls (usually, 486 busy, 603 busy everywhere). ; For 4XX codes, tipically a SIP Proxy will wait until other devices answer, ; for 6XX codes, tipically a SIP Proxy will end the call forking ; decline_code = 486 [Cassandra] ; Contact points to cassandra cluster ; cluster_contact_points = ; Keyspace to use for storage ; keyspace = ; Table name for token storage ; push_tokens_table = push_tokens ; Per room configuration options ; [room1@videoconference.example.com] ; record = True ; access_policy = deny, allow ; deny = all ; allow = domain1.com, test1@example.com, test2@example.com ; max_bitrate = 512000 ; video_codec = h264 ; video_disabled = False ; invite_participants = test@example.com ; persistent = False