diff --git a/xcap/backend/__init__.py b/xcap/backend/__init__.py index 8d1ad54..d14bcfd 100644 --- a/xcap/backend/__init__.py +++ b/xcap/backend/__init__.py @@ -1,66 +1,68 @@ from abc import ABC, abstractmethod -from typing import Optional +from typing import Callable, Optional, Union from pydantic import BaseModel -from starlette.background import BackgroundTasks +from starlette.background import BackgroundTask, BackgroundTasks from xcap.uri import XCAPUri class CustomBaseModel(BaseModel): def __init__(self, *args, **kwargs): # Get the field names from the class field_names = list(self.__annotations__.keys()) - # Check if we have the right number of positional arguments if len(args) > len(field_names): raise ValueError(f"Too many positional arguments. Expected at most {len(field_names)}") # Assign positional arguments to keyword arguments dynamically for i, field in enumerate(field_names): if i < len(args): # Only assign if we have enough positional arguments kwargs[field] = args[i] # Now call the parent __init__ method to handle the rest super().__init__(**kwargs) class StatusResponse(CustomBaseModel): code: int etag: Optional[str] = None data: Optional[bytes] = None # If this is binary data, it should be bytes old_etag: Optional[str] = None - background: Optional[BackgroundTasks] = None + background: Union[BackgroundTasks, BackgroundTask, None] = None + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) @property def succeed(self): return 200 <= self.code <= 299 class Config: arbitrary_types_allowed = True class BackendInterface(ABC): def _normalize_document_path(self, uri): if uri.application_id in ("pres-rules", "org.openmobilealliance.pres-rules"): # some clients e.g. counterpath's eyebeam save presence rules under # different filenames between versions and they expect to find the same # information, thus we are forcing all presence rules documents to be # saved under "index.xml" default filename uri.doc_selector.document_path = "index.xml" @abstractmethod - async def get_document(self, uri: XCAPUri, check_etag) -> Optional[StatusResponse]: + async def get_document(self, uri: XCAPUri, check_etag: Callable) -> Optional[StatusResponse]: """Retrieve data for a specific resource.""" pass @abstractmethod - async def put_document(self, uri: XCAPUri, document: bytes, check_etag) -> Optional[StatusResponse]: + async def put_document(self, uri: XCAPUri, document: bytes, check_etag: Callable) -> Optional[StatusResponse]: """Retrieve data for a specific resource.""" pass @abstractmethod - async def delete_document(self, uri: XCAPUri, check_etag) -> Optional[StatusResponse]: + async def delete_document(self, uri: XCAPUri, check_etag: Callable) -> Optional[StatusResponse]: """Retrieve data for a specific resource.""" pass diff --git a/xcap/backend/database.py b/xcap/backend/database.py index 9bdaf5e..bb33be0 100644 --- a/xcap/backend/database.py +++ b/xcap/backend/database.py @@ -1,168 +1,173 @@ +from typing import Any, Callable, Optional + from sqlalchemy.exc import SQLAlchemyError from sqlmodel import select +from xcap.authentication import Credentials from xcap.backend import BackendInterface, StatusResponse from xcap.db.manager import get_auth_db_session, get_db_session from xcap.db.models import XCAP, Subscriber, Watcher from xcap.dbutil import make_random_etag +from xcap.uri import XCAPUri class Error(Exception): def __init__(self): if hasattr(self, 'msg'): return Exception.__init__(self, self.msg) else: return Exception.__init__(self) class MultipleResultsError(Error): """This should never happen. If it did happen. that means either the table was corrupted or there's a logic error""" def __init__(self, params): Exception.__init__(self, 'database request has more than one result: ' + repr(params)) class DeleteFailed(Error): msg = 'DELETE request failed' class PasswordChecker(object): - async def query_user(self, credentials): + async def query_user(self, credentials) -> Any: async with get_auth_db_session() as db_session: result = await db_session.execute(select(Subscriber).where( Subscriber.username == credentials.username, Subscriber.domain == credentials.realm)) return result.first() class DatabaseStorage(BackendInterface): app_mapping = {"pres-rules" : 1 << 1, "resource-lists" : 1 << 2, "rls-services" : 1 << 3, "pidf-manipulation" : 1 << 4, "org.openmobilealliance.pres-rules" : 1 << 5, "org.openmobilealliance.pres-content" : 1 << 6, "org.openxcap.dialog-rules" : 1 << 7, "test-app" : 0} async def fetch_document(self, uri): username, domain = uri.user.username, uri.user.domain self._normalize_document_path(uri) doc_type = self.app_mapping[uri.application_id] document_path = uri.doc_selector.document_path async with get_db_session() as db_session: result = await db_session.execute(select(XCAP).where( XCAP.username == username, XCAP.domain == domain, XCAP.doc_type == doc_type, XCAP.doc_uri == document_path)) results = result.all() if results and len(results) > 1: raise MultipleResultsError({"username": username, "domain": domain, "doc_type": doc_type, "document_path": document_path}) return results - async def get_document(self, uri, check_etag): + async def get_document(self, uri: XCAPUri, check_etag: Callable) -> Optional[StatusResponse]: results = await self.fetch_document(uri) if results: doc = results[0][0].doc etag = results[0][0].etag if isinstance(doc, str): doc = doc.encode('utf-8') check_etag(etag) + return StatusResponse(200, etag, doc) return StatusResponse(404) - async def put_document(self, uri, document, check_etag): + async def put_document(self, uri: XCAPUri, document: bytes, check_etag: Callable) -> Optional[StatusResponse]: results = await self.fetch_document(uri) if results: existing_doc = results[0][0] old_etag = existing_doc.etag check_etag(old_etag) # Check if etag matches etag = make_random_etag(uri) # Generate a new etag old_data = existing_doc # Update fields params = { "doc": document, "etag": etag } for key, value in params.items(): setattr(old_data, key, value) async with get_db_session() as db_session: db_session.add(old_data) await db_session.commit() await db_session.refresh(old_data) return StatusResponse(200, etag, old_etag=old_etag) # If no document exists, create a new one username, domain = uri.user.username, uri.user.domain doc_type = self.app_mapping[uri.application_id] document_path = uri.doc_selector.document_path check_etag(None, False) etag = make_random_etag(uri) # Generate a new etag for the new document params = { "username": username, "domain": domain, "doc_type": doc_type, "etag": etag, "doc": document, "doc_uri": document_path } new_doc = XCAP(**params) async with get_db_session() as db_session: db_session.add(new_doc) await db_session.commit() await db_session.refresh(new_doc) return StatusResponse(201, etag) - async def delete_document(self, uri, check_etag): + async def delete_document(self, uri: XCAPUri, check_etag: Callable) -> Optional[StatusResponse]: results = await self.fetch_document(uri) if results: etag = results[0][0].etag check_etag(etag) async with get_db_session() as db_session: try: await db_session.delete(results[0][0]) await db_session.commit() except SQLAlchemyError: raise DeleteFailed return StatusResponse(200, old_etag=etag) return StatusResponse(404) async def get_watchers(self, uri): status_mapping = {1: "allow", 2: "confirm", 3: "deny"} presentity_uri = "sip:%s@%s" % (uri.user.username, uri.user.domain) async with get_db_session() as db_session: result = await db_session.execute(select(Watcher).where( Watcher.presentity_uri == presentity_uri)) result_list = result.all() watchers = [{"id": "%s@%s" % (w_user, w_domain), "status": status_mapping.get(subs_status, "unknown"), "online": "false"} for w_user, w_domain, subs_status in result_list] result = await db_session.execute(select(Watcher).where( Watcher.presentity_uri == presentity_uri, Watcher.event == 'presence')) result_list = result.all() active_watchers = set("%s@%s" % pair for pair in result) for watcher in watchers: if watcher["id"] in active_watchers: watcher["online"] = "true" return watchers Storage = DatabaseStorage diff --git a/xcap/backend/opensips.py b/xcap/backend/opensips.py index d80d73c..9150cce 100644 --- a/xcap/backend/opensips.py +++ b/xcap/backend/opensips.py @@ -1,123 +1,122 @@ import re -from typing import Union +from typing import Callable, Optional, Union from application import log from application.configuration import ConfigSetting from application.notification import (IObserver, Notification, NotificationCenter) from application.python import Null from application.python.types import Singleton from sipsimple.configuration.datatypes import SIPProxyAddress from sipsimple.core import (SIPURI, Engine, FromHeader, Header, Publication, RouteHeader) from sipsimple.lookup import DNSLookup from sipsimple.threading.green import run_in_green_thread from starlette.background import BackgroundTask from zope.interface import implementer from xcap import __version__ from xcap.backend import StatusResponse from xcap.backend.database import DatabaseStorage, PasswordChecker from xcap.configuration import OpensipsConfig as XCAPOpensipsConfig from xcap.configuration import ServerConfig -from xcap.types import CheckETagType from xcap.uri import XCAPUri from xcap.xcapdiff import Notifier class OpensipsConfig(XCAPOpensipsConfig): outbound_sip_proxy = ConfigSetting(type=SIPProxyAddress, value=None) @implementer(IObserver) class SIPNotifier(object, metaclass=Singleton): def __init__(self): self.engine = Engine() self.engine.start( ip_address=None if ServerConfig.address == '0.0.0.0' else ServerConfig.address, tcp_port=ServerConfig.tcp_port, user_agent="OpenXCAP %s" % __version__, ) self.sip_prefix_re = re.compile('^sips?:') try: outbound_sip_proxy = OpensipsConfig.outbound_sip_proxy self.outbound_proxy = SIPURI(host=outbound_sip_proxy.host, port=outbound_sip_proxy.port, parameters={'transport': 'tcp'}) except ValueError: log.warning('Invalid SIP proxy address specified: %s' % OpensipsConfig.outbound_sip_proxy) self.outbound_proxy = None NotificationCenter().add_observer(self) @run_in_green_thread def send_publish(self, uri, body=None): if self.outbound_proxy is None or body is None: return self.body = body self.uri = self.sip_prefix_re.sub('', uri) lookup = DNSLookup() NotificationCenter().add_observer(self, sender=lookup) lookup.lookup_sip_proxy(self.outbound_proxy, ["udp", "tcp", "tls"]) def handle_notification(self, notification: Notification) -> None: handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_DNSLookupDidSucceed(self, notification: Notification) -> None: notification_center = NotificationCenter() notification_center.remove_observer(self, sender=notification.sender) publication = Publication(FromHeader(SIPURI(self.uri)), "xcap-diff", "application/xcap-diff+xml", duration=0, extra_headers=[Header('Thor-Scope', 'publish-xcap')]) notification_center.add_observer(self, sender=publication) route = notification.data.result[0] route_header = RouteHeader(route.uri) publication.publish(self.body, route_header, timeout=5) def _NH_DNSLookupDidFail(self, notification: Notification) -> None: notification.center.remove_observer(self, sender=notification.sender) def _NH_SIPPublicationDidSucceed(self, notification: Notification) -> None: log.info('PUBLISH for xcap-diff event successfully sent to %s for %s' % (notification.data.route_header.uri, notification.sender.from_header.uri)) def _NH_SIPPublicationDidEnd(self, notification: Notification) -> None: log.info('PUBLISH for xcap-diff event ended for %s' % notification.sender.from_header.uri) notification.center.remove_observer(self, sender=notification.sender) def _NH_SIPPublicationDidFail(self, notification: Notification) -> None: log.info('PUBLISH for xcap-diff event failed to %s for %s' % (notification.data.route_header.uri, notification.sender.from_header.uri)) notification.center.remove_observer(self, sender=notification.sender) class NotifyingStorage(DatabaseStorage): def __init__(self): super(NotifyingStorage, self).__init__() self._sip_notifier = SIPNotifier() self.notifier = Notifier(ServerConfig.root, self._sip_notifier.send_publish) - async def put_document(self, uri: XCAPUri, document: bytes, check_etag: CheckETagType) -> StatusResponse: + async def put_document(self, uri: XCAPUri, document: bytes, check_etag: Callable) -> Optional[StatusResponse]: result = await super(NotifyingStorage, self).put_document(uri, document, check_etag) - if result.succeed: + if result and result.succeed: result.background = BackgroundTask(self.notifier.on_change, uri, result.old_etag, result.etag) return result - async def delete_document(self, uri: XCAPUri, check_etag: CheckETagType) -> StatusResponse: + async def delete_document(self, uri: XCAPUri, check_etag: Callable) -> Optional[StatusResponse]: result = await super(NotifyingStorage, self).delete_document(uri, check_etag) - if result.succeed: + if result and result.succeed: result.background = BackgroundTask(self.notifier.on_change, uri, result.old_etag, None) return result PasswordChecker = PasswordChecker Storage: Union[type[DatabaseStorage], type[NotifyingStorage]] = DatabaseStorage if OpensipsConfig.publish_xcapdiff: Storage = NotifyingStorage installSignalHandlers = False diff --git a/xcap/backend/sipthor.py b/xcap/backend/sipthor.py index 01f796d..6e04f30 100644 --- a/xcap/backend/sipthor.py +++ b/xcap/backend/sipthor.py @@ -1,407 +1,412 @@ import asyncio import json import re import signal +from typing import Any, Callable, Optional from application import log -from application.notification import IObserver, NotificationCenter +from application.notification import (IObserver, Notification, + NotificationCenter) from application.process import process from application.python import Null from application.python.types import Singleton from application.system import host from gnutls.interfaces.twisted import TLSContext, X509Credentials from sipsimple.core import (SIPURI, Engine, FromHeader, Header, Publication, RouteHeader) from sqlmodel import select from starlette.background import BackgroundTask, BackgroundTasks from thor.entities import GenericThorEntity as ThorEntity from thor.entities import ThorEntitiesRoleMap from thor.eventservice import EventServiceClient, ThorEvent -from thor.link import ControlLink, Notification, Request +from thor.link import ControlLink +from thor.link import Notification as ThorNotification +from thor.link import Request +from thor.link import Response as ThorResponse from twisted.internet import defer from twisted.internet.defer import Deferred from zope.interface import implementer import xcap from xcap.backend import BackendInterface, StatusResponse from xcap.configuration import ServerConfig, ThorNodeConfig from xcap.configuration.datatypes import DatabaseURI from xcap.db.manager import get_auth_db_session, get_db_session from xcap.db.models import DataObject, SipAccount from xcap.dbutil import make_random_etag from xcap.errors import NotFound +from xcap.uri import XCAPUri from xcap.xcapdiff import Notifier class ThorEntityAddress(bytes): - def __new__(cls, ip, control_port=None, version='unknown'): + ip: str + control_port: Optional[int] + version: str + + def __new__(cls, ip: str, control_port: Optional[int] = None, version: str = 'unknown') -> 'ThorEntityAddress': instance = super().__new__(cls, ip.encode('utf-8')) instance.ip = ip instance.version = version instance.control_port = control_port return instance class GetSIPWatchers(Request): - def __new__(cls, account): + def __new__(cls, account: str) -> 'GetSIPWatchers': command = "get sip_watchers for %s" % account instance = Request.__new__(cls, command) return instance class XCAPProvisioning(EventServiceClient, metaclass=Singleton): topics = ["Thor.Members"] def __init__(self): self.node = ThorEntity(host.default_ip if ServerConfig.address == '0.0.0.0' else ServerConfig.address, ['xcap_server'], control_port=25061, version=xcap.__version__) self.networks = {} self.presence_message = ThorEvent('Thor.Presence', self.node.id) self.shutdown_message = ThorEvent('Thor.Leave', self.node.id) credentials = X509Credentials(ThorNodeConfig.certificate, ThorNodeConfig.private_key, [ThorNodeConfig.ca]) credentials.verify_peer = True tls_context = TLSContext(credentials) self.control = ControlLink(tls_context) EventServiceClient.__init__(self, ThorNodeConfig.domain, tls_context) process.signals.add_handler(signal.SIGHUP, self._handle_signal) process.signals.add_handler(signal.SIGINT, self._handle_signal) process.signals.add_handler(signal.SIGTERM, self._handle_signal) - def _disconnect_all(self, result): + def _disconnect_all(self, result) -> None: self.control.disconnect_all() EventServiceClient._disconnect_all(self, result) - def lookup(self, key): + def lookup(self, key: str) -> Optional[ThorEntityAddress]: network = self.networks.get("sip_proxy", None) if network is None: return None try: node = network.lookup_node(key) except LookupError: node = None except Exception: log.exception() node = None return node - def notify(self, operation, entity_type, entity): + def notify(self, operation: str, entity_type: str, entity: str) -> None: node = self.lookup(entity) if node is not None: if node.control_port is None: log.error("Could not send notify because node %s has no control port" % node.ip) return - self.control.send_request(Notification("notify %s %s %s" % (operation, entity_type, entity)), (node.ip, node.control_port)) + self.control.send_request(ThorNotification("notify %s %s %s" % (operation, entity_type, entity)), (node.ip, node.control_port)) - async def get_watchers(self, key): + async def get_watchers(self, key: str) -> ThorResponse: """ Fetch watchers asynchronously. This method is called from asyncio code, so it will convert the Deferred returned by Twisted to a Future. """ # Get the Deferred from the Twisted code deferred = self._get_watchers(key) # Wrap the Twisted Deferred into an asyncio Future and await it result = await self._deferred_to_future(deferred) - return result - async def _deferred_to_future(self, deferred): + async def _deferred_to_future(self, deferred: Deferred) -> ThorResponse: """ Convert a Twisted Deferred into an asyncio Future. This allows us to await the Deferred in an async function. """ # Wrap the Deferred into an asyncio Future loop = asyncio.get_event_loop() future = loop.create_future() # Add a callback that will set the result on the asyncio Future when the Deferred is done deferred.addCallback(future.set_result) deferred.addErrback(future.set_exception) return await future - def _get_watchers(self, key): + def _get_watchers(self, key: str) -> Deferred: node = self.lookup(key) if node is None: - return defer.fail("no nodes found when searching for key %s" % str(key)) + return defer.fail(Exception(f"no nodes found when searching for key {key}")) if node.control_port is None: - return defer.fail("could not send notify because node %s has no control port" % node.ip) + return defer.fail(Exception(f"could not send notify because node {node.ip} has no control port")) request = GetSIPWatchers(key) request.deferred = Deferred() self.control.send_request(request, (node.ip, node.control_port)) return request.deferred - def handle_event(self, event): + def handle_event(self, event: ThorEvent) -> None: networks = self.networks role_map = ThorEntitiesRoleMap(event.message) ## mapping between role names and lists of nodes with that role thor_databases = role_map.get('thor_database', []) if thor_databases: # thor_databases.sort(lambda x, y: cmp(x.priority, y.priority) or cmp(x.ip, y.ip)) thor_databases.sort(key=lambda x: (x.priority, x.ip)) dburi = thor_databases[0].dburi else: dburi = None NotificationCenter().post_notification('db_uri', self, DatabaseURI(dburi)) all_roles = list(role_map.keys()) + list(networks.keys()) for role in all_roles: try: network = networks[role] ## avoid setdefault here because it always evaluates the 2nd argument except KeyError: from thor import network as thor_network if role in ["thor_manager", "thor_monitor", "provisioning_server", "media_relay", "thor_database"]: continue else: network = thor_network.new(ThorNodeConfig.multiply) networks[role] = network new_nodes = set([ThorEntityAddress(node.ip, getattr(node, 'control_port', None), getattr(node, 'version', 'unknown')) for node in role_map.get(role, [])]) old_nodes = set(network.nodes) added_nodes = new_nodes - old_nodes removed_nodes = old_nodes - new_nodes if removed_nodes: for node in removed_nodes: network.remove_node(node) self.control.discard_link(node) plural = len(removed_nodes) != 1 and 's' or '' log.info("Removed %s node%s: %s" % (role, plural, ', '.join([node.decode() for node in removed_nodes]))) if added_nodes: for node in added_nodes: network.add_node(node) plural = len(added_nodes) != 1 and 's' or '' log.info("Added %s node%s: %s" % (role, plural, ', '.join([node.decode() for node in added_nodes]))) # print('Thor %s nodes: %s' % (role, str(network.nodes))) class NoDatabase(Exception): pass class DatabaseConnection(object, metaclass=Singleton): - async def put(self, uri, document, check_etag, new_etag): + async def put(self, uri: XCAPUri, document: str, check_etag: Callable, new_etag: str) -> tuple: operation = lambda profile: self._put_operation(uri, document, check_etag, new_etag, profile) return await self.retrieve_profile(uri.user.username, uri.user.domain, operation, True) - async def delete(self, uri, check_etag): + async def delete(self, uri: XCAPUri, check_etag: Callable) -> tuple: operation = lambda profile: self._delete_operation(uri, check_etag, profile) return await self.retrieve_profile(uri.user.username, uri.user.domain, operation, True) - async def delete_all(self, uri): + async def delete_all(self, uri: XCAPUri) -> None: operation = lambda profile: self._delete_all_operation(uri, profile) return await self.retrieve_profile(uri.user.username, uri.user.domain, operation, True) - async def get(self, uri): + async def get(self, uri: XCAPUri) -> tuple: operation = lambda profile: self._get_operation(uri, profile) return await self.retrieve_profile(uri.user.username, uri.user.domain, operation, False) - return defer - async def get_profile(self, username, domain): + async def get_profile(self, username: str, domain: str) -> dict: return await self.retrieve_profile(username, domain, lambda profile: profile, False) - return defer - async def get_documents_list(self, uri): + async def get_documents_list(self, uri: XCAPUri) -> dict: operation = lambda profile: self._get_documents_list_operation(uri, profile) return await self.retrieve_profile(uri.user.username, uri.user.domain, operation, False) - def _put_operation(self, uri, document, check_etag, new_etag, profile): + def _put_operation(self, uri: XCAPUri, document: str, check_etag: Callable, new_etag: str, profile: dict) -> tuple: xcap_docs = profile.setdefault("xcap", {}) try: etag = xcap_docs[uri.application_id][uri.doc_selector.document_path][1] except KeyError: found = False etag = None check_etag(None, False) else: found = True check_etag(etag) xcap_app = xcap_docs.setdefault(uri.application_id, {}) xcap_app[uri.doc_selector.document_path] = (document, new_etag) return found, etag, new_etag - def _delete_operation(self, uri, check_etag, profile): + def _delete_operation(self, uri: XCAPUri, check_etag: Callable, profile: dict) -> tuple: xcap_docs = profile.setdefault("xcap", {}) try: etag = xcap_docs[uri.application_id][uri.doc_selector.document_path][1] except KeyError: raise NotFound() check_etag(etag) del xcap_docs[uri.application_id][uri.doc_selector.document_path] return (etag) - def _delete_all_operation(self, uri, profile): + def _delete_all_operation(self, uri: XCAPUri, profile: dict) -> None: xcap_docs = profile.setdefault("xcap", {}) xcap_docs.clear() return None - def _get_operation(self, uri, profile): + def _get_operation(self, uri: XCAPUri, profile: dict) -> tuple: try: xcap_docs = profile["xcap"] doc, etag = xcap_docs[uri.application_id][uri.doc_selector.document_path] except KeyError: raise NotFound() return doc, etag - def _get_documents_list_operation(self, uri, profile): + def _get_documents_list_operation(self, uri: XCAPUri, profile: dict) -> dict: try: xcap_docs = profile["xcap"] except KeyError: raise NotFound() return xcap_docs - async def retrieve_profile(self, username, domain, operation, update): + async def retrieve_profile(self, username: Optional[str], domain: Optional[str], operation: Callable, update: bool) -> Any: async with get_db_session() as db_session: query = await db_session.execute(select(SipAccount).where( SipAccount.username == username, SipAccount.domain == domain)) db_result = query.first() if not db_result: raise NotFound() profile = db_result[0].profile result = operation(profile) if update: db_result[0].profile = profile await db_session.commit() await db_session.refresh(db_result[0]) return result class PasswordChecker(object): - async def query_user(self, credentials): + async def query_user(self, credentials) -> Any: async with get_auth_db_session() as db_session: - result = await db_session.execute(select(SipAccount).where( + db_result = await db_session.execute(select(SipAccount).where( SipAccount.username == credentials.username, SipAccount.domain == credentials.realm)) - result = result.first() + result = db_result.first() if result: return [DataObject(**result[0].profile)] return result @implementer(IObserver) class SIPNotifier(object, metaclass=Singleton): def __init__(self): self.provisioning = XCAPProvisioning() self.engine = Engine() self.engine.start( ip_address=None if ServerConfig.address == '0.0.0.0' else ServerConfig.address, user_agent="OpenXCAP %s" % xcap.__version__, ) - def send_publish(self, uri, body) -> None: + def send_publish(self, uri: str, body: str) -> None: uri = re.sub("^(sip:|sips:)", "", uri) destination_node = self.provisioning.lookup(uri) if destination_node is not None: # TODO: add configuration settings for SIP transport. -Saul publication = Publication(FromHeader(SIPURI(uri)), "xcap-diff", "application/xcap-diff+xml", duration=0, extra_headers=[Header('Thor-Scope', 'publish-xcap')]) NotificationCenter().add_observer(self, sender=publication) route_header = RouteHeader(SIPURI(host=destination_node.decode(), port='5060', parameters=dict(transport='tcp'))) publication.publish(body, route_header, timeout=5) - def handle_notification(self, notification): + def handle_notification(self, notification: Notification) -> None: handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) - def _NH_SIPPublicationDidSucceed(self, notification): + def _NH_SIPPublicationDidSucceed(self, notification: Notification) -> None: log.info('PUBLISH xcap-diff sent to %s for %s' % (notification.data.route_header.uri, notification.sender.from_header.uri)) - def _NH_SIPPublicationDidEnd(self, notification): + def _NH_SIPPublicationDidEnd(self, notification: Notification) -> None: # log.info('PUBLISH for xcap-diff event ended for %s' % notification.sender.from_header.uri) NotificationCenter().remove_observer(self, sender=notification.sender) - def _NH_SIPPublicationDidFail(self, notification): + def _NH_SIPPublicationDidFail(self, notification: Notification) -> None: log.info('PUBLISH xcap-diff failed to %s for %s' % (notification.data.route_header.uri, notification.sender.from_header.uri)) NotificationCenter().remove_observer(self, sender=notification.sender) class Storage(BackendInterface): def __init__(self): self._database = DatabaseConnection() self._provisioning = XCAPProvisioning() self._sip_notifier = SIPNotifier() self._notifier = Notifier(ServerConfig.root, self._sip_notifier.send_publish) - async def get_document(self, uri, check_etag): + async def get_document(self, uri: XCAPUri, check_etag: Callable) -> Optional[StatusResponse]: self._normalize_document_path(uri) result = await self._database.get(uri) return self._got_document(result, check_etag) - result.addErrback(self._eb_not_found) - return result - def _got_document(self, xxx_todo_changeme, check_etag): - (doc, etag) = xxx_todo_changeme + def _got_document(self, result: tuple, check_etag: Callable) -> StatusResponse: + (doc, etag) = result check_etag(etag) return StatusResponse(200, etag, doc.encode('utf-8')) - async def put_document(self, uri, document, check_etag): - document = document.decode('utf-8') + async def put_document(self, uri: XCAPUri, document: bytes, check_etag: Callable) -> Optional[StatusResponse]: + decoded_document = document.decode('utf-8') self._normalize_document_path(uri) etag = make_random_etag(uri) - result = await self._database.put(uri, document, check_etag, etag) + result = await self._database.put(uri, decoded_document, check_etag, etag) return self._cb_put(result, uri, "%s@%s" % (uri.user.username, uri.user.domain)) - return result - def _cb_put(self, result, uri, thor_key): + def _cb_put(self, result: tuple, uri: XCAPUri, thor_key: str) -> StatusResponse: if result[0]: code = 200 else: code = 201 task = BackgroundTasks() task.add_task(BackgroundTask(self._provisioning.notify, "update", "sip_account", thor_key)) task.add_task(BackgroundTask(self._notifier.on_change, uri, result[1], result[2])) return StatusResponse(code, result[2], background=task) - async def delete_documents(self, uri): + async def delete_documents(self, uri: XCAPUri) -> Optional[StatusResponse]: result = await self._database.delete_all(uri) return self._cb_delete_all(result, uri, "%s@%s" % (uri.user.username, uri.user.domain)) - def _cb_delete_all(self, result, uri, thor_key): + def _cb_delete_all(self, result: Optional[str], uri: XCAPUri, thor_key: str) -> StatusResponse: task = BackgroundTasks() task.add_task(BackgroundTask(self._provisioning.notify, "update", "sip_account", thor_key)) return StatusResponse(200, background=task) - async def delete_document(self, uri, check_etag): + async def delete_document(self, uri: XCAPUri, check_etag: Callable) -> Optional[StatusResponse]: self._normalize_document_path(uri) result = await self._database.delete(uri, check_etag) return self._cb_delete(result, uri, "%s@%s" % (uri.user.username, uri.user.domain)) - def _cb_delete(self, result, uri, thor_key): + def _cb_delete(self, result: tuple, uri: XCAPUri, thor_key: str) -> StatusResponse: task = BackgroundTasks() task.add_task(BackgroundTask(self._provisioning.notify, "update", "sip_account", thor_key)) task.add_task(BackgroundTask(self._notifier.on_change, uri, result[1], None)) return StatusResponse(200, background=task) - async def get_watchers(self, uri): + async def get_watchers(self, uri: XCAPUri) -> dict: thor_key = "%s@%s" % (uri.user.username, uri.user.domain) result = await self._provisioning.get_watchers(thor_key) return self._get_watchers_decode(result) - def _get_watchers_decode(self, response): + def _get_watchers_decode(self, response: ThorResponse) -> dict: if response.code == 200: watchers = json.loads(response.data) for watcher in watchers: watcher["online"] = str(watcher["online"]).lower() return watchers else: print("error: %s" % response) + return {} - async def get_documents_list(self, uri): + async def get_documents_list(self, uri: XCAPUri) -> dict: result = await self._database.get_documents_list(uri) return self._got_documents_list(result) - def _got_documents_list(self, xcap_docs): - docs = {} + def _got_documents_list(self, xcap_docs: dict) -> dict: + docs: dict = {} if xcap_docs: for k, v in xcap_docs.items(): for k2, v2 in v.items(): if k in docs: docs[k].append((k2, v2[1])) else: docs[k] = [(k2, v2[1])] return docs installSignalHandlers = False diff --git a/xcap/types.py b/xcap/types.py index a9c3cd5..2805501 100644 --- a/xcap/types.py +++ b/xcap/types.py @@ -1,12 +1,10 @@ -from typing import Any, Callable, Coroutine, Union +from typing import Any, Callable, Coroutine, Optional, Union from fastapi import Request from xcap.configuration.datatypes import XCAPRootURI CheckETagType = Callable[[Request, str, bool], None] PublishFunction = Callable[[str, str], None] - - PublishWrapper = Callable[[str, XCAPRootURI], Coroutine[Any, Any, None]]