diff --git a/sylk-db-maintenance b/sylk-db-maintenance index 463e5da..5eb749f 100755 --- a/sylk-db-maintenance +++ b/sylk-db-maintenance @@ -1,353 +1,357 @@ #!/usr/bin/env python3 import argparse import sys import os from application import log from application.process import process CASSANDRA_MODULES_AVAILABLE = False try: from cassandra.cqlengine import columns, connection except ImportError: pass else: try: from cassandra.cqlengine.models import Model except ImportError: pass else: CASSANDRA_MODULES_AVAILABLE = True from cassandra import InvalidRequest from cassandra.cqlengine.query import LWTException from cassandra.cluster import Cluster, Session, NoHostAvailable, ExecutionProfile, EXEC_PROFILE_DEFAULT from cassandra.policies import DCAwareRoundRobinPolicy from cassandra.cqlengine.management import sync_table, create_keyspace_simple class colors: if sys.stdout.isatty(): TGREEN = '\033[32m' ENDC = '\033[m' BOLD = '\033[1m' else: TGREEN = '' ENDC = '' BOLD = '' class parse_level: def format(record): if record.levelname != 'INFO': return f'{record.levelname:<8s} ' else: return f"{' ......':<8s} " def ask(question): try: while (res := input(colors.TGREEN + f"{'>>':>8s} {question} (Enter y/n) " + colors.ENDC).lower()) not in {"y", "n"}:pass except KeyboardInterrupt: sys.exit(1) if res == "y": return True return False def bold(string): return colors.BOLD + string + colors.ENDC def db_init(): log.info(f"\n{' SylkServer - Cassandra database create/maintenance ':*^80s}\n") log.warn('Please note, this script can potentially destroy the data in the configured Cassandra keyspace.') log.warn('Make sure you have a backup if you already have data in the Cassandra cluster') if not ask("Would you like to continue?"): sys.exit() os.environ['CQLENG_ALLOW_SCHEMA_MANAGEMENT'] = '1' from sylk.applications.webrtcgateway.configuration import CassandraConfig from sylk.applications.webrtcgateway.models.storage.cassandra import PushTokens, ChatAccount, ChatMessage, ChatMessageIdMapping, PublicKey log.Formatter.prefix_format = parse_level log.Formatter.prefix_length = 0 configuration = CassandraConfig.__cfgtype__(CassandraConfig.__cfgfile__) if configuration.files: log.info('Reading storage configuration from {}'.format(', '.join(configuration.files))) if CassandraConfig.push_tokens_table: PushTokens.__table_name__ = CassandraConfig.push_tokens_table if CASSANDRA_MODULES_AVAILABLE: if CassandraConfig.cluster_contact_points: profile = ExecutionProfile( load_balancing_policy=DCAwareRoundRobinPolicy(), request_timeout=60 ) cluster = Cluster(CassandraConfig.cluster_contact_points, protocol_version=4, execution_profiles={EXEC_PROFILE_DEFAULT: profile}) try: session = cluster.connect() except NoHostAvailable as e: log.warning("Can't connect to Cassandra cluster") sys.exit() else: connection.set_session(session) if CassandraConfig.keyspace in cluster.metadata.keyspaces: log.info(f"Keyspace {bold(CassandraConfig.keyspace)} is already on the server.") else: log.warning(f"Keyspace {bold(CassandraConfig.keyspace)} is {bold('not')} defined on the server") if ask("Would you like to create the keyspace with SimpleStrategy?"): create_keyspace_simple(CassandraConfig.keyspace, 1) else: sys.exit(1) keyspace = cluster.metadata.keyspaces[CassandraConfig.keyspace] log.info(f'Server has keyspace {bold(keyspace.name)} with replication strategy: {keyspace.replication_strategy.name}') tables = [PushTokens, ChatAccount, ChatMessage, ChatMessageIdMapping, PublicKey] for table in tables: table.__keyspace__ = CassandraConfig.keyspace if table.__table_name__ in cluster.metadata.keyspaces[CassandraConfig.keyspace].tables: log.info(f'Table {bold(table.__table_name__)} is in the keyspace {bold(CassandraConfig.keyspace)}') if ask("Would you like to update the schema with the model?"): sync_table(table) else: log.info(f'Table {bold(table.__table_name__)} is not the keyspace {bold(CassandraConfig.keyspace)}') if ask("Would you like to create the table from the model?"): sync_table(table) else: log.warning("Cassandra cluster contact points are not set, please adjust webrtcgateway.ini'") sys.exit() else: log.warning('The python Cassandra drivers are not installed, please make sure they are installed') sys.exit() def remove_account(account): log.info(f"\n{' SylkServer - Remove account ':*^80s}\n") log.warn(f'Please note, will destroy {bold("ALL")} data of the account {account}') if not ask("Would you like to continue?"): sys.exit() from sylk.applications.webrtcgateway.configuration import CassandraConfig from sylk.applications.webrtcgateway.models.storage.cassandra import PushTokens, ChatAccount, ChatMessage, ChatMessageIdMapping, PublicKey log.Formatter.prefix_format = parse_level log.Formatter.prefix_length = 0 configuration = CassandraConfig.__cfgtype__(CassandraConfig.__cfgfile__) if configuration.files: log.info('Reading storage configuration from {}'.format(', '.join(configuration.files))) if CassandraConfig.push_tokens_table: PushTokens.__table_name__ = CassandraConfig.push_tokens_table if CASSANDRA_MODULES_AVAILABLE and CassandraConfig.cluster_contact_points: profile = ExecutionProfile( load_balancing_policy=DCAwareRoundRobinPolicy(), request_timeout=60 ) cluster = Cluster(CassandraConfig.cluster_contact_points, protocol_version=4, execution_profiles={EXEC_PROFILE_DEFAULT: profile}) try: session = cluster.connect() except NoHostAvailable as e: log.warning("Can't connect to Cassandra cluster") sys.exit() else: connection.set_session(session) keyspace = cluster.metadata.keyspaces[CassandraConfig.keyspace] log.info(f'Server has keyspace {bold(keyspace.name)} with replication strategy: {keyspace.replication_strategy.name}') tables = [PushTokens, ChatAccount, ChatMessage, ChatMessageIdMapping, PublicKey] for table in tables: table.__keyspace__ = CassandraConfig.keyspace try: messages = ChatMessage.objects(ChatMessage.account == account) except LWTException: pass else: size = len(messages) for message in messages: message.delete() log.info(f'Removed {size} messages from {account}') try: accounts = ChatAccount.objects(ChatAccount.account == account) except LWTException: pass else: size = len(accounts) for acc in accounts: acc.delete() log.info(f'Removed {account} from accounts') username, domain = account.split('@') try: push_tokens = PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain) except LWTException: pass else: size = len(push_tokens) for token in push_tokens: token.delete() log.info(f'Removed {size} push tokens from {account}') try: public_keys = PublicKey.objects(PublicKey.account == account) except LWTException: pass else: size = len(public_keys) for key in public_keys: key.delete() pass log.info(f'Removed public Key from {account}') else: log.warning('The python Cassandra drivers are not installed.') log.warning('We will use the JSON and pickle backend to wipe the account data') if not ask("Would you like to continue?"): sys.exit() from sylk.applications.webrtcgateway.storage import TokenStorage, MessageStorage from sipsimple.threading import ThreadManager import time log.Formatter.prefix_format = parse_level log.Formatter.prefix_length = 0 storage = TokenStorage() storage.load() time.sleep(.5) storage.removeAll(account) storage = MessageStorage() storage.load() time.sleep(.5) storage.remove_account(account) storage.remove_public_key(account) storage_path = os.path.join(ServerConfig.spool_dir, 'conversations') if os.path.exists(os.path.join(storage_path, account[0], f'{account}_messages.json')): os.remove(os.path.join(storage_path, account[0], f'{account}_messages.json')) ThreadManager().stop() def remove_messages(account, contact): log.info(f"\n{' SylkServer - Remove messages ':*^80s}\n") if contact is not None: log.warn(f'Please note, will destroy the messages from {contact} for the account {account} ') else: log.warn(f'Please note, will destroy {bold("ALL")} messages of the account {account}') if not ask("Would you like to continue?"): sys.exit() from sylk.applications.webrtcgateway.configuration import CassandraConfig from sylk.applications.webrtcgateway.models.storage.cassandra import ChatMessage from sylk.applications.webrtcgateway.storage import MessageStorage from sipsimple.threading import ThreadManager log.Formatter.prefix_format = parse_level log.Formatter.prefix_length = 0 if contact is not None: storage = MessageStorage() storage.load() storage.removeChat(account, contact) ThreadManager().stop() log.info(f'Messages from {contact} for {account} removed') sys.exit() configuration = CassandraConfig.__cfgtype__(CassandraConfig.__cfgfile__) if configuration.files: log.info('Reading storage configuration from {}'.format(', '.join(configuration.files))) if CASSANDRA_MODULES_AVAILABLE and CassandraConfig.cluster_contact_points: profile = ExecutionProfile( load_balancing_policy=DCAwareRoundRobinPolicy(), request_timeout=60 ) cluster = Cluster(CassandraConfig.cluster_contact_points, protocol_version=4, execution_profiles={EXEC_PROFILE_DEFAULT: profile}) try: session = cluster.connect() except NoHostAvailable as e: log.warning("Can't connect to Cassandra cluster") sys.exit() else: connection.set_session(session) keyspace = cluster.metadata.keyspaces[CassandraConfig.keyspace] log.info(f'Server has keyspace {bold(keyspace.name)} with replication strategy: {keyspace.replication_strategy.name}') tables = [ChatMessage] for table in tables: table.__keyspace__ = CassandraConfig.keyspace try: messages = ChatMessage.objects(ChatMessage.account == account) except LWTException: pass else: size = len(messages) for message in messages: message.delete() log.info(f'Removed {size} messages from {account}') else: storage_path = os.path.join(ServerConfig.spool_dir, 'conversations') if os.path.exists(os.path.join(storage_path, account[0], f'{account}_messages.json')): os.remove(os.path.join(storage_path, account[0], f'{account}_messages.json')) log.info('Messages for {account} removed') if __name__ == '__main__': process.configuration.subdirectory = 'sylkserver' parser = argparse.ArgumentParser(add_help=False) parser.add_argument('-h', '--help', action='help', default=argparse.SUPPRESS, help='Show this help message and exit.') parser.add_argument("--config-dir", dest='config_directory', default=None, metavar='PATH', help="Set a config directory.") subparsers = parser.add_subparsers(dest='action') dbinit = subparsers.add_parser('dbinit', help='initialize/Update database (default)') remove = subparsers.add_parser('remove', help='remove all data from an account') remove.add_argument('account', help='Account') remove_messages_sub = subparsers.add_parser('remove_messages', help='remove all messages from an account') remove_messages_sub.add_argument('account', help='Account') remove_messages_sub.add_argument('contact', nargs='?', help='optional contact to remove messages from') options = parser.parse_args() if options.config_directory is not None: process.configuration.local_directory = options.config_directory log.Formatter.prefix_format = parse_level log.Formatter.prefix_length = 0 from sylk.server import ServerConfig log.Formatter.prefix_format = parse_level log.Formatter.prefix_length = 0 + if not options.action: + parser.print_help() + sys.exit() + configuration = ServerConfig.__cfgtype__(ServerConfig.__cfgfile__) if configuration.files: log.info('Reading configuration from {}'.format(', '.join(configuration.files))) else: log.info('Not reading any configuration files (using internal defaults)') if options.action == 'remove': remove_account(options.account) elif options.action == 'remove_messages': remove_messages(options.account, options.contact) - else: + elif options.action == 'dbinit': db_init()