diff --git a/xcap/backend/__init__.py b/xcap/backend/__init__.py index 010e65c..8d1ad54 100644 --- a/xcap/backend/__init__.py +++ b/xcap/backend/__init__.py @@ -1,52 +1,66 @@ +from abc import ABC, abstractmethod +from typing import Optional -"""Interface to the backend subsystem""" +from pydantic import BaseModel +from starlette.background import BackgroundTasks -__all__ = ['database', 'opensips'] +from xcap.uri import XCAPUri -from zope.interface import Interface +class CustomBaseModel(BaseModel): + def __init__(self, *args, **kwargs): + # Get the field names from the class + field_names = list(self.__annotations__.keys()) -class StatusResponse(object): - def __init__(self, code, etag=None, data=None, old_etag=None): - self.code = code - self.etag = etag - self.data = data - self.old_etag = old_etag + # Check if we have the right number of positional arguments + if len(args) > len(field_names): + raise ValueError(f"Too many positional arguments. Expected at most {len(field_names)}") + + # Assign positional arguments to keyword arguments dynamically + for i, field in enumerate(field_names): + if i < len(args): # Only assign if we have enough positional arguments + kwargs[field] = args[i] + + # Now call the parent __init__ method to handle the rest + super().__init__(**kwargs) + + +class StatusResponse(CustomBaseModel): + code: int + etag: Optional[str] = None + data: Optional[bytes] = None # If this is binary data, it should be bytes + old_etag: Optional[str] = None + background: Optional[BackgroundTasks] = None @property def succeed(self): return 200 <= self.code <= 299 -class StorageError(Exception): pass - + class Config: + arbitrary_types_allowed = True -class IStorage(Interface): - """Storage interface. It defines the methods an XCAP storage class must implement.""" - def get_document(self, uri, check_etag): - """Fetch an XCAP document. +class BackendInterface(ABC): - @param uri: an XCAP URI that contains the XCAP user and the document selector - - @param check_etag: a callable used to check the etag of the stored document + def _normalize_document_path(self, uri): + if uri.application_id in ("pres-rules", "org.openmobilealliance.pres-rules"): + # some clients e.g. counterpath's eyebeam save presence rules under + # different filenames between versions and they expect to find the same + # information, thus we are forcing all presence rules documents to be + # saved under "index.xml" default filename + uri.doc_selector.document_path = "index.xml" - @returns: a deferred that'll be fired when the document is fetched""" + @abstractmethod + async def get_document(self, uri: XCAPUri, check_etag) -> Optional[StatusResponse]: + """Retrieve data for a specific resource.""" + pass - def put_document(self, uri, document, check_etag): - """Insert or replace an XCAP document. - - @param uri: an XCAP URI that contains the XCAP user and the document selector - - @param document: the XCAP document - - @param check_etag: a callable used to check the etag of the stored document - - @returns: a deferred that'll be fired when the action was completed.""" + @abstractmethod + async def put_document(self, uri: XCAPUri, document: bytes, check_etag) -> Optional[StatusResponse]: + """Retrieve data for a specific resource.""" + pass - def delete_document(self, uri, check_etag): - """Delete an XCAP document. - - @param uri: an XCAP URI that contains the XCAP user and the document selector - - @param check_etag: a callable used to check the etag of the document to be deleted - """ + @abstractmethod + async def delete_document(self, uri: XCAPUri, check_etag) -> Optional[StatusResponse]: + """Retrieve data for a specific resource.""" + pass diff --git a/xcap/backend/database.py b/xcap/backend/database.py index 2dc8ef3..9bdaf5e 100644 --- a/xcap/backend/database.py +++ b/xcap/backend/database.py @@ -1,406 +1,168 @@ +from sqlalchemy.exc import SQLAlchemyError +from sqlmodel import select -"""Implementation of a database backend.""" +from xcap.backend import BackendInterface, StatusResponse +from xcap.db.manager import get_auth_db_session, get_db_session +from xcap.db.models import XCAP, Subscriber, Watcher +from xcap.dbutil import make_random_etag -import sys -from application import log -from application.configuration import ConfigSection -from application.python.types import Singleton - -from zope.interface import implements -from twisted.cred import credentials, checkers, error as credError -from twisted.internet import defer - -from _mysql_exceptions import IntegrityError - -import xcap -from xcap.backend import IStorage, StatusResponse -from xcap.dbutil import connectionForURI, repeat_on_error, make_random_etag - - -class Config(ConfigSection): - __cfgfile__ = xcap.__cfgfile__ - __section__ = 'Database' - - authentication_db_uri = '' - storage_db_uri = '' - subscriber_table = 'subscriber' - user_col = 'username' - domain_col = 'domain' - password_col = 'password' - ha1_col = 'ha1' - xcap_table = 'xcap' - -if not Config.authentication_db_uri or not Config.storage_db_uri: - log.critical('Authentication DB URI and Storage DB URI must be provided') - sys.exit(1) - - -class DBBase(object): - def __init__(self): - self._db_connect() - - -class PasswordChecker(DBBase): - """A credentials checker against a database subscriber table.""" - - implements(checkers.ICredentialsChecker) - - credentialInterfaces = (credentials.IUsernamePassword, - credentials.IUsernameHashedPassword) - - def _db_connect(self): - self.conn = auth_db_connection(Config.authentication_db_uri) - - def _query_credentials(self, credentials): - raise NotImplementedError - - def _got_query_results(self, rows, credentials): - if not rows: - raise credError.UnauthorizedLogin("Unauthorized login") - else: - return self._authenticate_credentials(rows[0][0], credentials) - - def _authenticate_credentials(self, password, credentials): - raise NotImplementedError - - def _checkedPassword(self, matched, username, realm): - if matched: - username = username.split('@', 1)[0] - ## this is the avatar ID - return "%s@%s" % (username, realm) - else: - raise credError.UnauthorizedLogin("Unauthorized login") - - def requestAvatarId(self, credentials): - """Return the avatar ID for the credentials which must have the username - and realm attributes, or an UnauthorizedLogin in case of a failure.""" - d = self._query_credentials(credentials) - return d - - -class PlainPasswordChecker(PasswordChecker): - """A credentials checker against a database subscriber table, where the passwords - are stored in plain text.""" - - implements(checkers.ICredentialsChecker) - - def _query_credentials(self, credentials): - username, domain = credentials.username.split('@', 1)[0], credentials.realm - query = """SELECT %(password_col)s - FROM %(table)s - WHERE %(user_col)s = %%(username)s - AND %(domain_col)s = %%(domain)s""" % { - "password_col": Config.password_col, - "user_col": Config.user_col, - "domain_col": Config.domain_col, - "table": Config.subscriber_table } - params = {"username": username, - "domain": domain} - return self.conn.runQuery(query, params).addCallback(self._got_query_results, credentials) - - def _authenticate_credentials(self, hash, credentials): - return defer.maybeDeferred( - credentials.checkPassword, hash).addCallback( - self._checkedPassword, credentials.username, credentials.realm) - - -class HashPasswordChecker(PasswordChecker): - """A credentials checker against a database subscriber table, where the passwords - are stored as MD5 hashes.""" - - implements(checkers.ICredentialsChecker) - - def _query_credentials(self, credentials): - username, domain = credentials.username.split('@', 1)[0], credentials.realm - query = """SELECT %(ha1_col)s - FROM %(table)s - WHERE %(user_col)s = %%(username)s - AND %(domain_col)s = %%(domain)s""" % { - "ha1_col": Config.ha1_col, - "user_col": Config.user_col, - "domain_col": Config.domain_col, - "table": Config.subscriber_table} - params = {"username": username, - "domain": domain} - return self.conn.runQuery(query, params).addCallback(self._got_query_results, credentials) - - def _authenticate_credentials(self, hash, credentials): - return defer.maybeDeferred( - credentials.checkHash, hash).addCallback( - self._checkedPassword, credentials.username, credentials.realm) class Error(Exception): - def __init__(self): if hasattr(self, 'msg'): return Exception.__init__(self, self.msg) else: return Exception.__init__(self) -class RaceError(Error): - """The errors of this type are raised for the requests that failed because - of concurrent modification of the database by other clients. - - For example, before DELETE we do SELECT first, to check that a document of the - right etag exists. The actual check is performed by a function in twisted - that is passed as a callback. Then etag from the SELECT request is used in the - DELETE request. - - This seems unnecessary convoluted and probably should be changed to - 'DELETE .. WHERE etag=ETAG'. We still need to find out whether DELETE was - actually performed. - """ - -class UpdateFailed(RaceError): - msg = 'UPDATE request failed' - -class DeleteFailed(RaceError): - msg = 'DELETE request failed' class MultipleResultsError(Error): """This should never happen. If it did happen. that means either the table was corrupted or there's a logic error""" def __init__(self, params): Exception.__init__(self, 'database request has more than one result: ' + repr(params)) -class Storage(DBBase, metaclass=Singleton): - implements(IStorage) - app_mapping = {"pres-rules" : 1<<1, - "resource-lists" : 1<<2, - "rls-services" : 1<<3, - "pidf-manipulation" : 1<<4, - "org.openmobilealliance.pres-rules" : 1<<5, - "org.openmobilealliance.pres-content" : 1<<6, - "org.openxcap.dialog-rules" : 1<<7, - "test-app" : 0} +class DeleteFailed(Error): + msg = 'DELETE request failed' + - def _db_connect(self): - self.conn = storage_db_connection(Config.storage_db_uri) +class PasswordChecker(object): + async def query_user(self, credentials): + async with get_auth_db_session() as db_session: + result = await db_session.execute(select(Subscriber).where( + Subscriber.username == credentials.username, Subscriber.domain == credentials.realm)) + return result.first() - def _normalize_document_path(self, uri): - if uri.application_id in ("pres-rules", "org.openmobilealliance.pres-rules"): - # some clients e.g. counterpath's eyebeam save presence rules under - # different filenames between versions and they expect to find the same - # information, thus we are forcing all presence rules documents to be - # saved under "index.xml" default filename - uri.doc_selector.document_path = "index.xml" - def _get_document(self, trans, uri, check_etag): +class DatabaseStorage(BackendInterface): + app_mapping = {"pres-rules" : 1 << 1, + "resource-lists" : 1 << 2, + "rls-services" : 1 << 3, + "pidf-manipulation" : 1 << 4, + "org.openmobilealliance.pres-rules" : 1 << 5, + "org.openmobilealliance.pres-content" : 1 << 6, + "org.openxcap.dialog-rules" : 1 << 7, + "test-app" : 0} + + async def fetch_document(self, uri): username, domain = uri.user.username, uri.user.domain self._normalize_document_path(uri) doc_type = self.app_mapping[uri.application_id] - query = """SELECT doc, etag FROM %(table)s - WHERE username = %%(username)s AND domain = %%(domain)s - AND doc_type= %%(doc_type)s AND doc_uri=%%(document_path)s""" % { - "table": Config.xcap_table} - params = {"username": username, - "domain" : domain, - "doc_type": doc_type, - "document_path": uri.doc_selector.document_path} - trans.execute(query, params) - result = trans.fetchall() - if len(result)>1: - raise MultipleResultsError(params) - elif result: - doc, etag = result[0] + document_path = uri.doc_selector.document_path + + async with get_db_session() as db_session: + result = await db_session.execute(select(XCAP).where( + XCAP.username == username, + XCAP.domain == domain, XCAP.doc_type == doc_type, + XCAP.doc_uri == document_path)) + results = result.all() + if results and len(results) > 1: + raise MultipleResultsError({"username": username, + "domain": domain, + "doc_type": doc_type, + "document_path": document_path}) + return results + + async def get_document(self, uri, check_etag): + results = await self.fetch_document(uri) + if results: + doc = results[0][0].doc + etag = results[0][0].etag + if isinstance(doc, str): doc = doc.encode('utf-8') check_etag(etag) - response = StatusResponse(200, etag, doc) - else: - response = StatusResponse(404) - return response + return StatusResponse(200, etag, doc) + + return StatusResponse(404) + + async def put_document(self, uri, document, check_etag): + results = await self.fetch_document(uri) + if results: + existing_doc = results[0][0] + old_etag = existing_doc.etag + check_etag(old_etag) # Check if etag matches + etag = make_random_etag(uri) # Generate a new etag + old_data = existing_doc + + # Update fields + params = { + "doc": document, + "etag": etag + } + for key, value in params.items(): + setattr(old_data, key, value) + + async with get_db_session() as db_session: + db_session.add(old_data) + await db_session.commit() + await db_session.refresh(old_data) - def _put_document(self, trans, uri, document, check_etag): - username, domain = uri.user.username, uri.user.domain - self._normalize_document_path(uri) - doc_type = self.app_mapping[uri.application_id] - document_path = uri.doc_selector.document_path - query = """SELECT etag FROM %(table)s - WHERE username = %%(username)s AND domain = %%(domain)s - AND doc_type= %%(doc_type)s AND doc_uri=%%(document_path)s""" % { - "table": Config.xcap_table} - params = {"username": username, - "domain" : domain, - "doc_type": doc_type, - "document_path": document_path} - trans.execute(query, params) - result = trans.fetchall() - if len(result) > 1: - raise MultipleResultsError(params) - elif not result: - check_etag(None, False) - ## the document doesn't exist, create it - etag = make_random_etag(uri) - query = """INSERT INTO %(table)s (username, domain, doc_type, etag, doc, doc_uri) - VALUES (%%(username)s, %%(domain)s, %%(doc_type)s, %%(etag)s, %%(document)s, %%(document_path)s)""" % { - "table": Config.xcap_table } - params = {"username": username, - "domain" : domain, - "doc_type": doc_type, - "etag": etag, - "document": document, - "document_path": document_path} - # may raise IntegrityError here, if the document was created in another connection - # will be catched by repeat_on_error - trans.execute(query, params) - return StatusResponse(201, etag) - else: - old_etag = result[0][0] - ## first check the etag of the existing resource - check_etag(old_etag) - ## the document exists, replace it - etag = make_random_etag(uri) - query = """UPDATE %(table)s - SET doc = %%(document)s, etag = %%(etag)s - WHERE username = %%(username)s AND domain = %%(domain)s - AND doc_type = %%(doc_type)s AND etag = %%(old_etag)s - AND doc_uri = %%(document_path)s""" % { - "table": Config.xcap_table } - params = {"document": document, - "etag": etag, - "username": username, - "domain" : domain, - "doc_type": doc_type, - "old_etag": old_etag, - "document_path": document_path} - trans.execute(query, params) - # the request may not update anything (e.g. if etag was changed by another connection - # after we did SELECT); if so, we should retry - updated = getattr(trans._connection, 'affected_rows', lambda : 1)() - if not updated: - raise UpdateFailed - assert updated == 1, updated return StatusResponse(200, etag, old_etag=old_etag) - def _delete_document(self, trans, uri, check_etag): + # If no document exists, create a new one username, domain = uri.user.username, uri.user.domain - self._normalize_document_path(uri) doc_type = self.app_mapping[uri.application_id] document_path = uri.doc_selector.document_path - query = """SELECT etag FROM %(table)s - WHERE username = %%(username)s AND domain = %%(domain)s - AND doc_type= %%(doc_type)s AND doc_uri = %%(document_path)s""" % { - "table": Config.xcap_table} - params = {"username": username, - "domain" : domain, - "doc_type": doc_type, - "document_path": document_path} - trans.execute(query, params) - result = trans.fetchall() - if len(result)>1: - raise MultipleResultsError(params) - elif result: - etag = result[0][0] - check_etag(etag) - query = """DELETE FROM %(table)s - WHERE username = %%(username)s AND domain = %%(domain)s - AND doc_type= %%(doc_type)s AND doc_uri = %%(document_path)s - AND etag = %%(etag)s""" % {"table" : Config.xcap_table} - params = {"username": username, - "domain" : domain, - "doc_type": doc_type, - "document_path": document_path, - "etag": etag} - trans.execute(query, params) - deleted = getattr(trans._connection, 'affected_rows', lambda : 1)() - if not deleted: - # the document was replaced/removed after the SELECT but before the DELETE - raise DeleteFailed - assert deleted == 1, deleted - return StatusResponse(200, old_etag=etag) - else: - return StatusResponse(404) - - def _delete_all_documents(self, trans, uri): - username, domain = uri.user.username, uri.user.domain - query = """DELETE FROM %(table)s - WHERE username = %%(username)s AND domain = %%(domain)s - """ % {"table" : Config.xcap_table} - params = {"username": username, - "domain" : domain} - trans.execute(query, params) - return StatusResponse(200) - - def get_document(self, uri, check_etag): - return self.conn.runInteraction(self._get_document, uri, check_etag) - - def put_document(self, uri, document, check_etag): - return repeat_on_error(10, (UpdateFailed, IntegrityError), - self.conn.runInteraction, self._put_document, uri, document, check_etag) - def delete_document(self, uri, check_etag): - return repeat_on_error(10, DeleteFailed, self.conn.runInteraction, self._delete_document, uri, check_etag) + check_etag(None, False) + etag = make_random_etag(uri) # Generate a new etag for the new document + params = { + "username": username, + "domain": domain, + "doc_type": doc_type, + "etag": etag, + "doc": document, + "doc_uri": document_path + } + new_doc = XCAP(**params) + async with get_db_session() as db_session: + db_session.add(new_doc) + await db_session.commit() + await db_session.refresh(new_doc) + + return StatusResponse(201, etag) + + async def delete_document(self, uri, check_etag): + results = await self.fetch_document(uri) + + if results: + etag = results[0][0].etag + check_etag(etag) + async with get_db_session() as db_session: + try: + await db_session.delete(results[0][0]) + await db_session.commit() + except SQLAlchemyError: + raise DeleteFailed - def delete_documents(self, uri): - return self.conn.runInteraction(self._delete_all_documents, uri) + return StatusResponse(200, old_etag=etag) + return StatusResponse(404) - # Application-specific functions - def _get_watchers(self, trans, uri): + async def get_watchers(self, uri): status_mapping = {1: "allow", 2: "confirm", 3: "deny"} presentity_uri = "sip:%s@%s" % (uri.user.username, uri.user.domain) - query = """SELECT watcher_username, watcher_domain, status FROM watchers - WHERE presentity_uri = %(puri)s""" - params = {'puri': presentity_uri} - trans.execute(query, params) - result = trans.fetchall() - watchers = [{"id": "%s@%s" % (w_user, w_domain), - "status": status_mapping.get(subs_status, "unknown"), - "online": "false"} for w_user, w_domain, subs_status in result] - query = """SELECT watcher_username, watcher_domain FROM active_watchers - WHERE presentity_uri = %(puri)s AND event = 'presence'""" - trans.execute(query, params) - result = trans.fetchall() - active_watchers = set("%s@%s" % pair for pair in result) - for watcher in watchers: - if watcher["id"] in active_watchers: - watcher["online"] = "true" - return watchers - - def get_watchers(self, uri): - return self.conn.runInteraction(self._get_watchers, uri) - - def _get_documents_list(self, trans, uri): - query = """SELECT doc_type, doc_uri, etag FROM %(table)s - WHERE username = %%(username)s AND domain = %%(domain)s""" % {'table': Config.xcap_table} - params = {'username': uri.user.username, 'domain': uri.user.domain} - trans.execute(query, params) - result = trans.fetchall() - docs = {} - for r in result: - app = [k for k, v in self.app_mapping.items() if v == r[0]][0] - if app in docs: - docs[app].append((r[1], r[2])) - else: - docs[app] = [(r[1], r[2])] # Ex: {'pres-rules': [('index.html', '4564fd9c9a2a2e3e796310b00c9908aa')]} - return docs - - def get_documents_list(self, uri): - return self.conn.runInteraction(self._get_documents_list, uri) - - -installSignalHandlers = True - -def auth_db_connection(uri): - conn = connectionForURI(uri) - return conn - -def storage_db_connection(uri): - conn = connectionForURI(uri) - def cb(res): - if res[0:1][0:1] and res[0][0]: - print('%s xcap documents in the database' % res[0][0]) - return res - def eb(fail): - fail.printTraceback() - return fail - # connect early, so database problem are detected early - d = conn.runQuery('SELECT count(*) from %s' % Config.xcap_table) - d.addCallback(cb) - d.addErrback(eb) - return conn + async with get_db_session() as db_session: + result = await db_session.execute(select(Watcher).where( + Watcher.presentity_uri == presentity_uri)) + result_list = result.all() + + watchers = [{"id": "%s@%s" % (w_user, w_domain), + "status": status_mapping.get(subs_status, "unknown"), + "online": "false"} for w_user, w_domain, subs_status in result_list] + + result = await db_session.execute(select(Watcher).where( + Watcher.presentity_uri == presentity_uri, Watcher.event == 'presence')) + result_list = result.all() + active_watchers = set("%s@%s" % pair for pair in result) + for watcher in watchers: + if watcher["id"] in active_watchers: + watcher["online"] = "true" + return watchers + + +Storage = DatabaseStorage diff --git a/xcap/backend/opensips.py b/xcap/backend/opensips.py index b133e7e..d80d73c 100644 --- a/xcap/backend/opensips.py +++ b/xcap/backend/opensips.py @@ -1,121 +1,123 @@ - -"""Implementation of an OpenSIPS backend.""" - import re +from typing import Union from application import log -from application.configuration import ConfigSection, ConfigSetting -from application.configuration.datatypes import IPAddress -from application.notification import IObserver, NotificationCenter +from application.configuration import ConfigSetting +from application.notification import (IObserver, Notification, + NotificationCenter) from application.python import Null from application.python.types import Singleton -from sipsimple.core import Engine, FromHeader, Header, Publication, RouteHeader, SIPURI from sipsimple.configuration.datatypes import SIPProxyAddress -from sipsimple.threading import run_in_twisted_thread -from zope.interface import implements - -import xcap -from xcap.datatypes import XCAPRootURI -from xcap.backend import database +from sipsimple.core import (SIPURI, Engine, FromHeader, Header, Publication, + RouteHeader) +from sipsimple.lookup import DNSLookup +from sipsimple.threading.green import run_in_green_thread +from starlette.background import BackgroundTask +from zope.interface import implementer + +from xcap import __version__ +from xcap.backend import StatusResponse +from xcap.backend.database import DatabaseStorage, PasswordChecker +from xcap.configuration import OpensipsConfig as XCAPOpensipsConfig +from xcap.configuration import ServerConfig +from xcap.types import CheckETagType +from xcap.uri import XCAPUri from xcap.xcapdiff import Notifier -class ServerConfig(ConfigSection): - __cfgfile__ = xcap.__cfgfile__ - __section__ = 'Server' - - address = ConfigSetting(type=IPAddress, value='0.0.0.0') - root = ConfigSetting(type=XCAPRootURI, value=None) - - -class Config(ConfigSection): - __cfgfile__ = xcap.__cfgfile__ - __section__ = 'OpenSIPS' - - publish_xcapdiff = False - outbound_sip_proxy = '' - - -class PlainPasswordChecker(database.PlainPasswordChecker): pass -class HashPasswordChecker(database.HashPasswordChecker): pass +class OpensipsConfig(XCAPOpensipsConfig): + outbound_sip_proxy = ConfigSetting(type=SIPProxyAddress, value=None) +@implementer(IObserver) class SIPNotifier(object, metaclass=Singleton): - implements(IObserver) def __init__(self): self.engine = Engine() self.engine.start( - ip_address=None if ServerConfig.address == '0.0.0.0' else ServerConfig.address, - user_agent="OpenXCAP %s" % xcap.__version__, + ip_address=None if ServerConfig.address == '0.0.0.0' else ServerConfig.address, + tcp_port=ServerConfig.tcp_port, + user_agent="OpenXCAP %s" % __version__, ) self.sip_prefix_re = re.compile('^sips?:') try: - self.outbound_proxy = SIPProxyAddress.from_description(Config.outbound_sip_proxy) + outbound_sip_proxy = OpensipsConfig.outbound_sip_proxy + self.outbound_proxy = SIPURI(host=outbound_sip_proxy.host, + port=outbound_sip_proxy.port, + parameters={'transport': 'tcp'}) except ValueError: - log.warning('Invalid SIP proxy address specified: %s' % Config.outbound_sip_proxy) + log.warning('Invalid SIP proxy address specified: %s' % OpensipsConfig.outbound_sip_proxy) self.outbound_proxy = None + NotificationCenter().add_observer(self) - def send_publish(self, uri, body): - if self.outbound_proxy is None: + @run_in_green_thread + def send_publish(self, uri, body=None): + if self.outbound_proxy is None or body is None: return - uri = self.sip_prefix_re.sub('', uri) - publication = Publication(FromHeader(SIPURI(uri)), + + self.body = body + self.uri = self.sip_prefix_re.sub('', uri) + lookup = DNSLookup() + NotificationCenter().add_observer(self, sender=lookup) + lookup.lookup_sip_proxy(self.outbound_proxy, ["udp", "tcp", "tls"]) + + def handle_notification(self, notification: Notification) -> None: + handler = getattr(self, '_NH_%s' % notification.name, Null) + handler(notification) + + def _NH_DNSLookupDidSucceed(self, notification: Notification) -> None: + notification_center = NotificationCenter() + notification_center.remove_observer(self, sender=notification.sender) + + publication = Publication(FromHeader(SIPURI(self.uri)), "xcap-diff", "application/xcap-diff+xml", duration=0, extra_headers=[Header('Thor-Scope', 'publish-xcap')]) - NotificationCenter().add_observer(self, sender=publication) - route_header = RouteHeader(SIPURI(host=self.outbound_proxy.host, port=self.outbound_proxy.port, parameters=dict(transport=self.outbound_proxy.transport))) - publication.publish(body, route_header, timeout=5) + notification_center.add_observer(self, sender=publication) + route = notification.data.result[0] + route_header = RouteHeader(route.uri) + publication.publish(self.body, route_header, timeout=5) - @run_in_twisted_thread - def handle_notification(self, notification): - handler = getattr(self, '_NH_%s' % notification.name, Null) - handler(notification) + def _NH_DNSLookupDidFail(self, notification: Notification) -> None: + notification.center.remove_observer(self, sender=notification.sender) - def _NH_SIPPublicationDidSucceed(self, notification): + def _NH_SIPPublicationDidSucceed(self, notification: Notification) -> None: log.info('PUBLISH for xcap-diff event successfully sent to %s for %s' % (notification.data.route_header.uri, notification.sender.from_header.uri)) - def _NH_SIPPublicationDidEnd(self, notification): + def _NH_SIPPublicationDidEnd(self, notification: Notification) -> None: log.info('PUBLISH for xcap-diff event ended for %s' % notification.sender.from_header.uri) notification.center.remove_observer(self, sender=notification.sender) - def _NH_SIPPublicationDidFail(self, notification): + def _NH_SIPPublicationDidFail(self, notification: Notification) -> None: log.info('PUBLISH for xcap-diff event failed to %s for %s' % (notification.data.route_header.uri, notification.sender.from_header.uri)) notification.center.remove_observer(self, sender=notification.sender) -class NotifyingStorage(database.Storage): +class NotifyingStorage(DatabaseStorage): def __init__(self): super(NotifyingStorage, self).__init__() self._sip_notifier = SIPNotifier() self.notifier = Notifier(ServerConfig.root, self._sip_notifier.send_publish) - def put_document(self, uri, document, check_etag): - d = super(NotifyingStorage, self).put_document(uri, document, check_etag) - d.addCallback(lambda result: self._on_put(result, uri)) - return d - - def _on_put(self, result, uri): + async def put_document(self, uri: XCAPUri, document: bytes, check_etag: CheckETagType) -> StatusResponse: + result = await super(NotifyingStorage, self).put_document(uri, document, check_etag) if result.succeed: - self.notifier.on_change(uri, result.old_etag, result.etag) + result.background = BackgroundTask(self.notifier.on_change, uri, result.old_etag, result.etag) return result - def delete_document(self, uri, check_etag): - d = super(NotifyingStorage, self).delete_document(uri, check_etag) - d.addCallback(lambda result: self._on_delete(result, uri)) - return d - - def _on_delete(self, result, uri): + async def delete_document(self, uri: XCAPUri, check_etag: CheckETagType) -> StatusResponse: + result = await super(NotifyingStorage, self).delete_document(uri, check_etag) if result.succeed: - self.notifier.on_change(uri, result.old_etag, None) + result.background = BackgroundTask(self.notifier.on_change, uri, result.old_etag, None) return result -if Config.publish_xcapdiff: +PasswordChecker = PasswordChecker + +Storage: Union[type[DatabaseStorage], type[NotifyingStorage]] = DatabaseStorage + +if OpensipsConfig.publish_xcapdiff: Storage = NotifyingStorage -else: - Storage = database.Storage -installSignalHandlers = database.installSignalHandlers +installSignalHandlers = False diff --git a/xcap/backend/sipthor.py b/xcap/backend/sipthor.py index 8652aa6..3ba160f 100644 --- a/xcap/backend/sipthor.py +++ b/xcap/backend/sipthor.py @@ -1,590 +1,662 @@ +import asyncio +import json import re import signal -import cjson - -from formencode import validators - from application import log +from application.configuration import ConfigSection, ConfigSetting +from application.configuration.datatypes import IPAddress from application.notification import IObserver, NotificationCenter +from application.process import process from application.python import Null from application.python.types import Singleton from application.system import host -from application.process import process -from application.configuration import ConfigSection, ConfigSetting -from application.configuration.datatypes import IPAddress - -from sqlobject import sqlhub, connectionForURI, SQLObject, AND -from sqlobject import StringCol, IntCol, DateTimeCol, SOBLOBCol, Col -from sqlobject import MultipleJoin, ForeignKey - -from zope.interface import implements -from twisted.internet import reactor -from twisted.internet import defer -from twisted.internet.defer import Deferred, maybeDeferred -from twisted.cred.checkers import ICredentialsChecker -from twisted.cred.credentials import IUsernamePassword, IUsernameHashedPassword -from twisted.cred.error import UnauthorizedLogin - -from thor.link import ControlLink, Notification, Request -from thor.eventservice import EventServiceClient, ThorEvent -from thor.entities import ThorEntitiesRoleMap, GenericThorEntity as ThorEntity - +from formencode import validators from gnutls.interfaces.twisted import TLSContext, X509Credentials - -from sipsimple.core import Engine, FromHeader, Header, Publication, RouteHeader, SIPURI -from sipsimple.threading import run_in_twisted_thread from sipsimple.configuration.datatypes import Port - +from sipsimple.core import (SIPURI, Engine, FromHeader, Header, Publication, + RouteHeader) +from sipsimple.threading import run_in_twisted_thread +from sqlmodel import Session, select +from sqlobject import (AND, Col, DateTimeCol, ForeignKey, IntCol, MultipleJoin, + SOBLOBCol, SQLObject, StringCol, connectionForURI, + sqlhub) +from starlette.background import BackgroundTask, BackgroundTasks +from thor.entities import GenericThorEntity as ThorEntity +from thor.entities import ThorEntitiesRoleMap +from thor.eventservice import EventServiceClient, ThorEvent +from thor.link import ControlLink, Notification, Request +from twisted.cred.checkers import ICredentialsChecker +from twisted.cred.credentials import IUsernameHashedPassword, IUsernamePassword +from twisted.cred.error import UnauthorizedLogin +from twisted.internet import asyncioreactor as reactor +from twisted.internet import defer +from twisted.internet.defer import Deferred, inlineCallbacks, maybeDeferred +from zope.interface import implementer import xcap -from xcap.tls import Certificate, PrivateKey -from xcap.backend import StatusResponse +from xcap.backend import BackendInterface, StatusResponse +from xcap.configuration import ServerConfig +from xcap.configuration.datatypes import DatabaseURI from xcap.datatypes import XCAPRootURI +from xcap.db.manager import get_auth_db_session, get_db_session from xcap.dbutil import make_random_etag +from xcap.errors import NotFound +from xcap.tls import Certificate, PrivateKey from xcap.xcapdiff import Notifier class ThorNodeConfig(ConfigSection): - __cfgfile__ = xcap.__cfgfile__ + __cfgfile__ = "config.ini" __section__ = 'ThorNetwork' domain = "sipthor.net" multiply = 1000 certificate = ConfigSetting(type=Certificate, value=None) private_key = ConfigSetting(type=PrivateKey, value=None) ca = ConfigSetting(type=Certificate, value=None) class ServerConfig(ConfigSection): - __cfgfile__ = xcap.__cfgfile__ + __cfgfile__ = "config.ini" # Link to project documentation __section__ = 'Server' address = ConfigSetting(type=IPAddress, value='0.0.0.0') root = ConfigSetting(type=XCAPRootURI, value=None) - tcp_port = ConfigSetting(type=Port, value=35060) + tcp_port = 35060 class JSONValidator(validators.Validator): def to_python(self, value, state): if value is None: return None try: - return cjson.decode(value) + return json.loads(value) except Exception: raise validators.Invalid("expected a decodable JSON object in the JSONCol '%s', got %s %r instead" % (self.name, type(value), value), value, state) def from_python(self, value, state): if value is None: return None try: - return cjson.encode(value) + return json.dumps(value) except Exception: raise validators.Invalid("expected an encodable JSON object in the JSONCol '%s', got %s %r instead" % (self.name, type(value), value), value, state) class SOJSONCol(SOBLOBCol): def createValidators(self): return [JSONValidator()] + super(SOJSONCol, self).createValidators() class JSONCol(Col): baseClass = SOJSONCol - -class SipAccount(SQLObject): - class sqlmeta: - table = 'sip_accounts_meta' - username = StringCol(length=64) - domain = StringCol(length=64) - firstName = StringCol(length=64) - lastName = StringCol(length=64) - email = StringCol(length=64) - customerId = IntCol(default=0) - resellerId = IntCol(default=0) - ownerId = IntCol(default=0) - changeDate = DateTimeCol(default=DateTimeCol.now) - ## joins - data = MultipleJoin('SipAccountData', joinColumn='account_id') - - def _set_profile(self, value): - data = list(self.data) - if not data: +import asyncio +from typing import List, Optional + +from pydantic import BaseModel +from sqlalchemy import JSON, Column +# from typing import Optional, Any +from sqlalchemy.orm.attributes import flag_modified +from sqlmodel import Field, Relationship, SQLModel + + +class DataObject(BaseModel): + class Config: + # Allow extra fields in the data object and treat them as attributes + extra = "allow" + + +class SipAccountData(SQLModel, table=True): + __tablename__ = 'sip_accounts_data' + id: int = Field(default=None, primary_key=True) + account_id: int = Field(default=None, foreign_key="sip_accounts_meta.id") + profile: Optional[dict] = Field(default=None, sa_column=Column(JSON)) + + account: "SipAccount" = Relationship(back_populates="data", + sa_relationship_kwargs={"lazy": "joined"}, + ) + +class SipAccount(SQLModel, table=True): + __tablename__ = 'sip_accounts_meta' + id: int = Field(default=None, primary_key=True) + username: str = Field(max_length=64) + domain: str = Field(max_length=64) + first_name: Optional[str] = Field(default=None, max_length=64) + last_name: Optional[str] = Field(default=None, max_length=64) + email: Optional[str] = Field(default=None, max_length=64) + customer_id: int = Field(default=0) + reseller_id: int = Field(default=0) + owner_id: int = Field(default=0) + change_date: Optional[str] = Field(default=None) + + # Relationships + data: List[SipAccountData] = Relationship(back_populates="account", + sa_relationship_kwargs={"lazy": "joined"}, + # cascade='all, delete-orphan' + ) + + def set_profile(self, value: dict): + # this replaces the method to set the profile +# data = list(self.data) + if not self.data: SipAccountData(account=self, profile=value) else: - data[0].profile = value - - def _get_profile(self): - return self.data[0].profile - - def set(self, **kwargs): - kwargs = kwargs.copy() - profile = kwargs.pop('profile', None) - SQLObject.set(self, **kwargs) - if profile is not None: - self._set_profile(profile) - - -class SipAccountData(SQLObject): - class sqlmeta: - table = 'sip_accounts_data' - account = ForeignKey('SipAccount', cascade=True) - profile = JSONCol() - - -class ThorEntityAddress(str): + flag_modified(self.data[0], "profile") + self.data[0].profile = value + + @property + def profile(self) -> Optional[dict]: + return self.data[0].profile if self.data else None + + # def __setattr__(self, name, value): + # """ + # Override __setattr__ to automatically handle updates to attributes. + # This is where we can implement custom logic for specific fields, such as profile. + # """ + # # Handle special case for `profile` + # if name == "profile": + # print("name is profile") + # if self.data: + # # If data exists, set profile on the first related record + # print(f"set data \n{self.data[0].profile}\n to \n{value}\n") + # self.data[0].profile = value + # else: + # # Otherwise, create a new SipAccountData record with the profile + # new_data = SipAccountData(account_id=self.id, profile=value) + # self.data.append(new_data) + # else: + # # For other fields, just use the default behavior + # super().__setattr__(name, value) + + + @profile.setter + def profile(self, value: Optional[str]): + self.set_profile(value) + # """Setter for the profile to the first SipAccountData.""" + # print(f"setter name is profile {self.data}") + # if self.data: + # self.data[0].profile = value + # else: + # # If no related SipAccountData exists, create one + # new_data = SipAccountData(account_id=self.id, profile=value) + # self.data.append(new_data) + # # Track the modification of the SipAccountData object + # if self.data: + # # Add the first SipAccountData instance to the session, if modified + # db_session.add(self.data[0]) # Explicitly add to session +# class SipAccountData(SQLObject): +# class sqlmeta: +# table = 'sip_accounts_data' +# account = ForeignKey('SipAccount', cascade=True) +# profile = JSONCol() + +from application.notification import (IObserver, NotificationCenter, + NotificationData) + +# class ThorEntityAddress(str): +# def __new__(cls, ip, control_port=None, version='unknown'): +# instance = super().__new__(cls, ip) +# instance.ip = ip +# instance.version = version +# instance.control_port = control_port +# return instance + +class ThorEntityAddress(bytes): def __new__(cls, ip, control_port=None, version='unknown'): - instance = str.__new__(cls, ip) + instance = super().__new__(cls, ip.encode('utf-8')) instance.ip = ip instance.version = version instance.control_port = control_port return instance - class GetSIPWatchers(Request): def __new__(cls, account): command = "get sip_watchers for %s" % account instance = Request.__new__(cls, command) return instance - class XCAPProvisioning(EventServiceClient, metaclass=Singleton): topics = ["Thor.Members"] def __init__(self): - self._database = DatabaseConnection() - self.node = ThorEntity(host.default_ip if ServerConfig.address == '0.0.0.0' else ServerConfig.address, ['xcap_server'], version=xcap.__version__) + self.node = ThorEntity(host.default_ip if ServerConfig.address == '0.0.0.0' else ServerConfig.address, ['xcap_server'], control_port=25061 ,version=xcap.__version__) self.networks = {} self.presence_message = ThorEvent('Thor.Presence', self.node.id) self.shutdown_message = ThorEvent('Thor.Leave', self.node.id) credentials = X509Credentials(ThorNodeConfig.certificate, ThorNodeConfig.private_key, [ThorNodeConfig.ca]) credentials.verify_peer = True tls_context = TLSContext(credentials) self.control = ControlLink(tls_context) EventServiceClient.__init__(self, ThorNodeConfig.domain, tls_context) process.signals.add_handler(signal.SIGHUP, self._handle_signal) process.signals.add_handler(signal.SIGINT, self._handle_signal) process.signals.add_handler(signal.SIGTERM, self._handle_signal) def _disconnect_all(self, result): self.control.disconnect_all() EventServiceClient._disconnect_all(self, result) def lookup(self, key): network = self.networks.get("sip_proxy", None) if network is None: return None try: node = network.lookup_node(key) except LookupError: node = None except Exception: log.exception() node = None return node def notify(self, operation, entity_type, entity): node = self.lookup(entity) if node is not None: if node.control_port is None: log.error("Could not send notify because node %s has no control port" % node.ip) return self.control.send_request(Notification("notify %s %s %s" % (operation, entity_type, entity)), (node.ip, node.control_port)) - def get_watchers(self, key): + async def get_watchers(self, key): + """ + Fetch watchers asynchronously. + This method is called from asyncio code, so it will + convert the Deferred returned by Twisted to a Future. + """ + # Get the Deferred from the Twisted code + deferred = self._get_watchers(key) + + # Wrap the Twisted Deferred into an asyncio Future and await it + result = await self._deferred_to_future(deferred) + + return result + + async def _deferred_to_future(self, deferred): + """ + Convert a Twisted Deferred into an asyncio Future. + This allows us to await the Deferred in an async function. + """ + # Wrap the Deferred into an asyncio Future + loop = asyncio.get_event_loop() + future = loop.create_future() + + # Add a callback that will set the result on the asyncio Future when the Deferred is done + deferred.addCallback(future.set_result) + deferred.addErrback(future.set_exception) + + return await future + + def _get_watchers(self, key): node = self.lookup(key) if node is None: return defer.fail("no nodes found when searching for key %s" % str(key)) if node.control_port is None: return defer.fail("could not send notify because node %s has no control port" % node.ip) request = GetSIPWatchers(key) request.deferred = Deferred() + self.control.send_request(request, (node.ip, node.control_port)) return request.deferred def handle_event(self, event): - # print "Received event: %s" % event +# print("Received event: %s" % event) networks = self.networks role_map = ThorEntitiesRoleMap(event.message) ## mapping between role names and lists of nodes with that role thor_databases = role_map.get('thor_database', []) if thor_databases: - thor_databases.sort(lambda x, y: cmp(x.priority, y.priority) or cmp(x.ip, y.ip)) + # thor_databases.sort(lambda x, y: cmp(x.priority, y.priority) or cmp(x.ip, y.ip)) + thor_databases.sort(key=lambda x: (x.priority, x.ip)) dburi = thor_databases[0].dburi else: dburi = None - self._database.update_dburi(dburi) + # print(f"set updated {dburi}") +# configure_db_connection(dburi) + NotificationCenter().post_notification('db_uri', self, DatabaseURI(dburi)) + #loop = asyncio.get_event_loop() + #loop.call_soon_threadsafe(configure_db_connection, uri) + # self._database.update_dburi(dburi) all_roles = list(role_map.keys()) + list(networks.keys()) for role in all_roles: try: network = networks[role] ## avoid setdefault here because it always evaluates the 2nd argument except KeyError: from thor import network as thor_network if role in ["thor_manager", "thor_monitor", "provisioning_server", "media_relay", "thor_database"]: continue else: network = thor_network.new(ThorNodeConfig.multiply) networks[role] = network new_nodes = set([ThorEntityAddress(node.ip, getattr(node, 'control_port', None), getattr(node, 'version', 'unknown')) for node in role_map.get(role, [])]) old_nodes = set(network.nodes) + # for item in new_nodes: + # print(item.control_port) added_nodes = new_nodes - old_nodes removed_nodes = old_nodes - new_nodes if removed_nodes: for node in removed_nodes: network.remove_node(node) - self.control.discard_link((node.ip, node.control_port)) - log.info('Removed %s nodes: %s' % (role, ', '.join(removed_nodes))) + self.control.discard_link(node) + plural = len(removed_nodes) != 1 and 's' or '' + log.info("Removed %s node%s: %s" % (role, plural, ', '.join([node.decode() for node in removed_nodes]))) if added_nodes: for node in added_nodes: + # print(type(network)) + # print(type(node)) + # print(node.control_port) network.add_node(node) - log.info('Added %s nodes: %s' % (role, ', '.join(added_nodes))) + # new = network.lookup_node(node) + # print(f'{new} - {new.control_port}') + plural = len(added_nodes) != 1 and 's' or '' + log.info("Added %s node%s: %s" % (role, plural, ', '.join([node.decode() for node in added_nodes]))) + # print(networks.nodes) # print('Thor %s nodes: %s' % (role, str(network.nodes))) -class NotFound(Exception): - pass - +# class NotFound(HTTPError): +# pass +# class NoDatabase(Exception): pass class DatabaseConnection(object, metaclass=Singleton): - def __init__(self): - self.dburi = None + # def __init__(self): + # self.dburi = None - # Methods to be called from the Twisted thread: - def put(self, uri, document, check_etag, new_etag): - defer = Deferred() + async def put(self, uri, document, check_etag, new_etag): operation = lambda profile: self._put_operation(uri, document, check_etag, new_etag, profile) - reactor.callInThread(self.retrieve_profile, uri.user.username, uri.user.domain, operation, True, defer) - return defer + return await self.retrieve_profile(uri.user.username, uri.user.domain, operation, True) - def delete(self, uri, check_etag): - defer = Deferred() + async def delete(self, uri, check_etag): operation = lambda profile: self._delete_operation(uri, check_etag, profile) - reactor.callInThread(self.retrieve_profile, uri.user.username, uri.user.domain, operation, True, defer) - return defer + return await self.retrieve_profile(uri.user.username, uri.user.domain, operation, True) - def delete_all(self, uri): - defer = Deferred() + async def delete_all(self, uri): operation = lambda profile: self._delete_all_operation(uri, profile) - reactor.callInThread(self.retrieve_profile, uri.user.username, uri.user.domain, operation, True, defer) - return defer + return await self.retrieve_profile(uri.user.username, uri.user.domain, operation, True) - def get(self, uri): - defer = Deferred() + async def get(self, uri): operation = lambda profile: self._get_operation(uri, profile) - reactor.callInThread(self.retrieve_profile, uri.user.username, uri.user.domain, operation, False, defer) + return await self.retrieve_profile(uri.user.username, uri.user.domain, operation, False) return defer - def get_profile(self, username, domain): - defer = Deferred() - reactor.callInThread(self.retrieve_profile, username, domain, lambda profile: profile, False, defer) + async def get_profile(self, username, domain): + return await self.retrieve_profile(username, domain, lambda profile: profile, False) return defer - def get_documents_list(self, uri): - defer = Deferred() + async def get_documents_list(self, uri): operation = lambda profile: self._get_documents_list_operation(uri, profile) - reactor.callInThread(self.retrieve_profile, uri.user.username, uri.user.domain, operation, False, defer) - return defer + return await self.retrieve_profile(uri.user.username, uri.user.domain, operation, False) - - # Methods to be called in a separate thread: def _put_operation(self, uri, document, check_etag, new_etag, profile): xcap_docs = profile.setdefault("xcap", {}) try: etag = xcap_docs[uri.application_id][uri.doc_selector.document_path][1] except KeyError: found = False etag = None check_etag(None, False) else: found = True check_etag(etag) xcap_app = xcap_docs.setdefault(uri.application_id, {}) xcap_app[uri.doc_selector.document_path] = (document, new_etag) return found, etag, new_etag def _delete_operation(self, uri, check_etag, profile): xcap_docs = profile.setdefault("xcap", {}) try: etag = xcap_docs[uri.application_id][uri.doc_selector.document_path][1] except KeyError: raise NotFound() check_etag(etag) del(xcap_docs[uri.application_id][uri.doc_selector.document_path]) return (etag) def _delete_all_operation(self, uri, profile): xcap_docs = profile.setdefault("xcap", {}) xcap_docs.clear() return None def _get_operation(self, uri, profile): try: xcap_docs = profile["xcap"] doc, etag = xcap_docs[uri.application_id][uri.doc_selector.document_path] except KeyError: raise NotFound() return doc, etag def _get_documents_list_operation(self, uri, profile): try: xcap_docs = profile["xcap"] except KeyError: raise NotFound() return xcap_docs - def retrieve_profile(self, username, domain, operation, update, defer): - transaction = None - try: - if self.dburi is None: - raise NoDatabase() - transaction = sqlhub.processConnection.transaction() - try: - db_account = SipAccount.select(AND(SipAccount.q.username == username, SipAccount.q.domain == domain), connection = transaction, forUpdate = update)[0] - except IndexError: + + async def retrieve_profile(self, username, domain, operation, update): + async with get_db_session() as db_session: + query = await db_session.execute(select(SipAccount).where( + SipAccount.username == username, SipAccount.domain == domain)) + db_result = query.first() + if not db_result: raise NotFound() - profile = db_account.profile - result = operation(profile) # NB: may modify profile! + profile = db_result[0].profile + result = operation(profile) if update: - db_account.profile = profile - transaction.commit(close=True) - except Exception as e: - if transaction: - transaction.rollback() - reactor.callFromThread(defer.errback, e) - else: - reactor.callFromThread(defer.callback, result) - finally: - if transaction: - transaction.cache.clear() - - def update_dburi(self, dburi): - if self.dburi != dburi: - if self.dburi is not None: - sqlhub.processConnection.close() - if dburi is None: - sqlhub.processConnection - else: - sqlhub.processConnection = connectionForURI(dburi) - self.dburi = dburi - - + db_result[0].profile = profile + await db_session.commit() + await db_session.refresh(db_result[0]) + return result + +class PasswordChecker(object): + async def query_user(self, credentials): + async with get_auth_db_session() as db_session: + result = await db_session.execute(select(SipAccount).where( + SipAccount.username == credentials.username, SipAccount.domain == credentials.realm)) + result = result.first() + if result: + return [DataObject(**result[0].profile)] + return result + + +@implementer(ICredentialsChecker) class SipthorPasswordChecker(object): - implements(ICredentialsChecker) credentialInterfaces = (IUsernamePassword, IUsernameHashedPassword) def __init__(self): self._database = DatabaseConnection() def _query_credentials(self, credentials): username, domain = credentials.username.split('@', 1)[0], credentials.realm result = self._database.get_profile(username, domain) result.addCallback(self._got_query_results, credentials) result.addErrback(self._got_unsuccessfull) return result def _got_unsuccessfull(self, failure): failure.trap(NotFound) raise UnauthorizedLogin("Unauthorized login") def _got_query_results(self, profile, credentials): return self._authenticate_credentials(profile, credentials) def _authenticate_credentials(self, profile, credentials): raise NotImplementedError def _checkedPassword(self, matched, username, realm): if matched: username = username.split('@', 1)[0] ## this is the avatar ID return "%s@%s" % (username, realm) else: raise UnauthorizedLogin("Unauthorized login") def requestAvatarId(self, credentials): """Return the avatar ID for the credentials which must have the username and realm attributes, or an UnauthorizedLogin in case of a failure.""" d = self._query_credentials(credentials) return d +@implementer(ICredentialsChecker) class PlainPasswordChecker(SipthorPasswordChecker): """A credentials checker against a database subscriber table, where the passwords are stored in plain text.""" - implements(ICredentialsChecker) def _authenticate_credentials(self, profile, credentials): return maybeDeferred( credentials.checkPassword, profile["password"]).addCallback( self._checkedPassword, credentials.username, credentials.realm) +@implementer(ICredentialsChecker) class HashPasswordChecker(SipthorPasswordChecker): """A credentials checker against a database subscriber table, where the passwords are stored as MD5 hashes.""" - implements(ICredentialsChecker) def _authenticate_credentials(self, profile, credentials): return maybeDeferred( credentials.checkHash, profile["ha1"]).addCallback( self._checkedPassword, credentials.username, credentials.realm) - +@implementer(IObserver) class SIPNotifier(object, metaclass=Singleton): - implements(IObserver) def __init__(self): self.provisioning = XCAPProvisioning() self.engine = Engine() self.engine.start( - ip_address=None if ServerConfig.address == '0.0.0.0' else ServerConfig.address, - tcp_port=ServerConfig.tcp_port, - user_agent="OpenXCAP %s" % xcap.__version__, + ip_address=None if ServerConfig.address == '0.0.0.0' else ServerConfig.address, + #tcp_port=ServerConfig.tcp_port, + user_agent="OpenXCAP %s" % xcap.__version__, ) - def send_publish(self, uri, body): + + def send_publish(self, uri, body) -> None: uri = re.sub("^(sip:|sips:)", "", uri) destination_node = self.provisioning.lookup(uri) + if destination_node is not None: # TODO: add configuration settings for SIP transport. -Saul publication = Publication(FromHeader(SIPURI(uri)), "xcap-diff", "application/xcap-diff+xml", duration=0, extra_headers=[Header('Thor-Scope', 'publish-xcap')]) NotificationCenter().add_observer(self, sender=publication) - route_header = RouteHeader(SIPURI(host=str(destination_node), port='5060', parameters=dict(transport='tcp'))) + route_header = RouteHeader(SIPURI(host=destination_node.decode(), port='5060', parameters=dict(transport='tcp'))) publication.publish(body, route_header, timeout=5) - @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPPublicationDidSucceed(self, notification): log.info('PUBLISH xcap-diff sent to %s for %s' % (notification.data.route_header.uri, notification.sender.from_header.uri)) def _NH_SIPPublicationDidEnd(self, notification): #log.info('PUBLISH for xcap-diff event ended for %s' % notification.sender.from_header.uri) NotificationCenter().remove_observer(self, sender=notification.sender) def _NH_SIPPublicationDidFail(self, notification): log.info('PUBLISH xcap-diff failed to %s for %s' % (notification.data.route_header.uri, notification.sender.from_header.uri)) NotificationCenter().remove_observer(self, sender=notification.sender) -class Storage(object, metaclass=Singleton): +class Storage(BackendInterface): def __init__(self): self._database = DatabaseConnection() self._provisioning = XCAPProvisioning() self._sip_notifier = SIPNotifier() self._notifier = Notifier(ServerConfig.root, self._sip_notifier.send_publish) - def _normalize_document_path(self, uri): - if uri.application_id in ("pres-rules", "org.openmobilealliance.pres-rules"): - # some clients e.g. counterpath's eyebeam save presence rules under - # different filenames between versions and they expect to find the same - # information, thus we are forcing all presence rules documents to be - # saved under "index.xml" default filename - uri.doc_selector.document_path = "index.xml" - - def get_document(self, uri, check_etag): + async def get_document(self, uri, check_etag): self._normalize_document_path(uri) - result = self._database.get(uri) - result.addCallback(self._got_document, check_etag) + result = await self._database.get(uri) + return self._got_document(result, check_etag) result.addErrback(self._eb_not_found) return result - def _eb_not_found(self, failure): - failure.trap(NotFound) - return StatusResponse(404) - def _got_document(self, xxx_todo_changeme, check_etag): (doc, etag) = xxx_todo_changeme check_etag(etag) return StatusResponse(200, etag, doc.encode('utf-8')) - def put_document(self, uri, document, check_etag): + async def put_document(self, uri, document, check_etag): document = document.decode('utf-8') self._normalize_document_path(uri) etag = make_random_etag(uri) - result = self._database.put(uri, document, check_etag, etag) - result.addCallback(self._cb_put, uri, "%s@%s" % (uri.user.username, uri.user.domain)) - result.addErrback(self._eb_not_found) + result = await self._database.put(uri, document, check_etag, etag) + return self._cb_put(result, uri, "%s@%s" % (uri.user.username, uri.user.domain)) return result def _cb_put(self, result, uri, thor_key): if result[0]: code = 200 else: code = 201 - self._provisioning.notify("update", "sip_account", thor_key) - self._notifier.on_change(uri, result[1], result[2]) - return StatusResponse(code, result[2]) + task = BackgroundTasks() + task.add_task(BackgroundTask(self._provisioning.notify, "update", "sip_account", thor_key)) + task.add_task(BackgroundTask(self._notifier.on_change, uri, result[1], result[2])) + return StatusResponse(code, result[2], background=task) - def delete_documents(self, uri): - result = self._database.delete_all(uri) - result.addCallback(self._cb_delete_all, uri, "%s@%s" % (uri.user.username, uri.user.domain)) - result.addErrback(self._eb_not_found) - return result + async def delete_documents(self, uri): + result = await self._database.delete_all(uri) + return self._cb_delete_all(result, uri, "%s@%s" % (uri.user.username, uri.user.domain)) def _cb_delete_all(self, result, uri, thor_key): - self._provisioning.notify("update", "sip_account", thor_key) - return StatusResponse(200) + task = BackgroundTasks() + task.add_task(BackgroundTask(self._provisioning.notify, "update", "sip_account", thor_key)) + return StatusResponse(200, background=task) - def delete_document(self, uri, check_etag): + async def delete_document(self, uri, check_etag): self._normalize_document_path(uri) - result = self._database.delete(uri, check_etag) - result.addCallback(self._cb_delete, uri, "%s@%s" % (uri.user.username, uri.user.domain)) - result.addErrback(self._eb_not_found) - return result + result = await self._database.delete(uri, check_etag) + return self._cb_delete(result, uri, "%s@%s" % (uri.user.username, uri.user.domain)) def _cb_delete(self, result, uri, thor_key): - self._provisioning.notify("update", "sip_account", thor_key) - self._notifier.on_change(uri, result[1], None) - return StatusResponse(200) + task = BackgroundTasks() + # print(result) + task.add_task(BackgroundTask(self._provisioning.notify, "update", "sip_account", thor_key)) + task.add_task(BackgroundTask(self._notifier.on_change, uri, result[1], None)) + return StatusResponse(200, background=task) - def get_watchers(self, uri): + async def get_watchers(self, uri): thor_key = "%s@%s" % (uri.user.username, uri.user.domain) - result = self._provisioning.get_watchers(thor_key) - result.addCallback(self._get_watchers_decode) - return result + result = await self._provisioning.get_watchers(thor_key) + return self._get_watchers_decode(result) def _get_watchers_decode(self, response): if response.code == 200: - watchers = cjson.decode(response.data) + watchers = json.loads(response.data) for watcher in watchers: watcher["online"] = str(watcher["online"]).lower() return watchers else: print("error: %s" % response) - def get_documents_list(self, uri): - result = self._database.get_documents_list(uri) - result.addCallback(self._got_documents_list) - result.addErrback(self._got_documents_list_error) - return result + async def get_documents_list(self, uri): + result = await self._database.get_documents_list(uri) + return self._got_documents_list(result) def _got_documents_list(self, xcap_docs): docs = {} if xcap_docs: for k, v in xcap_docs.items(): for k2, v2 in v.items(): if k in docs: docs[k].append((k2, v2[1])) else: docs[k] = [(k2, v2[1])] return docs - def _got_documents_list_error(self, failure): - failure.trap(NotFound) - return {} - installSignalHandlers = False