diff --git a/xcap/appusage/__init__.py b/xcap/appusage/__init__.py index b242d7d..62777a5 100644 --- a/xcap/appusage/__init__.py +++ b/xcap/appusage/__init__.py @@ -1,374 +1,347 @@ """XCAP application usage module""" import os import sys +from io import BytesIO -from io import StringIO -from lxml import etree - -from application.configuration import ConfigSection, ConfigSetting -from application.configuration.datatypes import StringList from application import log +from application.configuration import ConfigSetting +from application.configuration.datatypes import StringList +from lxml import etree -import xcap -from xcap import errors -from xcap import element +from xcap import element, errors from xcap.backend import StatusResponse +from xcap.configuration import ServerConfig as XCAPServerConfig +from xcap.configuration.datatypes import Backend -class Backend(object): - """Configuration datatype, used to select a backend module from the configuration file.""" - def __new__(typ, value): - value = value.lower() - try: - return __import__('xcap.backend.%s' % value, globals(), locals(), ['']) - except (ImportError, AssertionError) as e: - log.critical('Cannot load %r backend module: %s' % (value, e)) - sys.exit(1) - except Exception: - log.exception() - sys.exit(1) - - -class ServerConfig(ConfigSection): - __cfgfile__ = xcap.__cfgfile__ - __section__ = 'Server' - +class ServerConfig(XCAPServerConfig): backend = ConfigSetting(type=Backend, value=None) disabled_applications = ConfigSetting(type=StringList, value=[]) document_validation = True + if ServerConfig.backend is None: log.critical('OpenXCAP needs a backend to be specified in order to run') sys.exit(1) class ApplicationUsage(object): """Base class defining an XCAP application""" id = None ## the Application Unique ID (AUID) default_ns = None ## the default XML namespace mime_type = None ## the MIME type schema_file = None ## filename of the schema for the application def __init__(self, storage): ## the XML schema that defines valid documents for this application if self.schema_file: xml_schema_doc = etree.parse(open(os.path.join(os.path.dirname(__file__), 'xml-schemas', self.schema_file), 'r')) self.xml_schema = etree.XMLSchema(xml_schema_doc) else: class EverythingIsValid(object): def __call__(self, *args, **kw): return True + def validate(self, *args, **kw): return True self.xml_schema = EverythingIsValid() if storage is not None: self.storage = storage ## Validation def _check_UTF8_encoding(self, xml_doc): """Check if the document is UTF8 encoded. Raise an NotUTF8Error if it's not.""" if xml_doc.docinfo.encoding.lower() != 'utf-8': raise errors.NotUTF8Error(comment='document encoding is %s' % xml_doc.docinfo.encoding) def _check_schema_validation(self, xml_doc): """Check if the given XCAP document validates against the application's schema""" if not self.xml_schema(xml_doc): raise errors.SchemaValidationError(comment=self.xml_schema.error_log) def _check_additional_constraints(self, xml_doc): """Check additional validations constraints for this XCAP document. Should be overriden in subclasses if specified by the application usage, and raise a ConstraintFailureError if needed.""" def validate_document(self, xcap_doc): """Check if a document is valid for this application.""" try: - xml_doc = etree.parse(StringIO(xcap_doc)) + xml_doc = etree.parse(BytesIO(xcap_doc)) # XXX do not use TreeBuilder here - except etree.XMLSyntaxError as ex: - ex.http_error = errors.NotWellFormedError(comment=str(ex)) - raise + except etree.XMLSyntaxError as e: + raise errors.NotWellFormedError(comment=str(e)) except Exception as ex: - ex.http_error = errors.NotWellFormedError() - raise + raise errors.NotWellFormedError() self._check_UTF8_encoding(xml_doc) if ServerConfig.document_validation: self._check_schema_validation(xml_doc) self._check_additional_constraints(xml_doc) ## Authorization policy def is_authorized(self, xcap_user, xcap_uri): """Default authorization policy. Authorizes an XCAPUser for an XCAPUri. Return True if the user is authorized, False otherwise.""" return xcap_user and xcap_user == xcap_uri.user ## Document management def _not_implemented(self, context): raise errors.ResourceNotFound("Application %s does not implement %s context" % (self.id, context)) def get_document(self, uri, check_etag): context = uri.doc_selector.context if context == 'global': return self.get_document_global(uri, check_etag) elif context == 'users': return self.get_document_local(uri, check_etag) else: self._not_implemented(context) def get_document_global(self, uri, check_etag): self._not_implemented('global') - def get_document_local(self, uri, check_etag): - return self.storage.get_document(uri, check_etag) + async def get_document_local(self, uri, check_etag): + return await self.storage.get_document(uri, check_etag) - def put_document(self, uri, document, check_etag): + async def put_document(self, uri, document, check_etag): self.validate_document(document) - return self.storage.put_document(uri, document, check_etag) + return await self.storage.put_document(uri, document, check_etag) - def delete_document(self, uri, check_etag): - return self.storage.delete_document(uri, check_etag) + async def delete_document(self, uri, check_etag): + return await self.storage.delete_document(uri, check_etag) ## Element management - def _cb_put_element(self, response, uri, element_body, check_etag): + async def _cb_put_element(self, response, uri, element_body, check_etag): """This is called when the document that relates to the element is retrieved.""" if response.code == 404: ### XXX let the storate raise raise errors.NoParentError ### catch error in errback and attach http_error fixed_element_selector = uri.node_selector.element_selector.fix_star(element_body) try: result = element.put(response.data, fixed_element_selector, element_body) except element.SelectorError as ex: - ex.http_error = errors.NoParentError(comment=str(ex)) - raise + raise errors.NoParentError(comment=str(ex)) if result is None: raise errors.NoParentError new_document, created = result get_result = element.get(new_document, uri.node_selector.element_selector) if get_result != element_body.strip(): raise errors.CannotInsertError('PUT request failed GET(PUT(x))==x invariant') - d = self.put_document(uri, new_document, check_etag) + d = await self.put_document(uri, new_document, check_etag) def set_201_code(response): try: - if response.code==200: + if response.code == 200: response.code = 201 except AttributeError: pass return response if created: - d.addCallback(set_201_code) + set_201_code(d) return d - def put_element(self, uri, element_body, check_etag): + async def put_element(self, uri, element_body, check_etag): try: element.check_xml_fragment(element_body) except element.sax.SAXParseException as ex: - ex.http_error = errors.NotXMLFragmentError(comment=str(ex)) - raise + raise errors.NotXMLFragmentError(comment=str(ex)) except Exception as ex: - ex.http_error = errors.NotXMLFragmentError() - raise - d = self.get_document(uri, check_etag) - return d.addCallbacks(self._cb_put_element, callbackArgs=(uri, element_body, check_etag)) + raise errors.NotXMLFragmentError() + d = await self.get_document(uri, check_etag) + return await self._cb_put_element(d, uri, element_body, check_etag) def _cb_get_element(self, response, uri): """This is called when the document related to the element is retrieved.""" if response.code == 404: ## XXX why not let the storage raise? raise errors.ResourceNotFound("The requested document %s was not found on this server" % uri.doc_selector) result = element.get(response.data, uri.node_selector.element_selector) if not result: msg = "The requested element %s was not found in the document %s" % (uri.node_selector, uri.doc_selector) raise errors.ResourceNotFound(msg) return StatusResponse(200, response.etag, result) - def get_element(self, uri, check_etag): - d = self.get_document(uri, check_etag) - return d.addCallbacks(self._cb_get_element, callbackArgs=(uri, )) + async def get_element(self, uri, check_etag): + d = await self.get_document(uri, check_etag) + return self._cb_get_element(d, uri) - def _cb_delete_element(self, response, uri, check_etag): + async def _cb_delete_element(self, response, uri, check_etag): if response.code == 404: raise errors.ResourceNotFound("The requested document %s was not found on this server" % uri.doc_selector) new_document = element.delete(response.data, uri.node_selector.element_selector) if not new_document: raise errors.ResourceNotFound get_result = element.find(new_document, uri.node_selector.element_selector) if get_result: raise errors.CannotDeleteError('DELETE request failed GET(DELETE(x))==404 invariant') - return self.put_document(uri, new_document, check_etag) + return await self.put_document(uri, new_document, check_etag) - def delete_element(self, uri, check_etag): - d = self.get_document(uri, check_etag) - return d.addCallbacks(self._cb_delete_element, callbackArgs=(uri, check_etag)) + async def delete_element(self, uri, check_etag): + d = await self.get_document(uri, check_etag) + return await self._cb_delete_element(d, uri, check_etag) ## Attribute management def _cb_get_attribute(self, response, uri): """This is called when the document that relates to the attribute is retrieved.""" if response.code == 404: raise errors.ResourceNotFound document = response.data - xml_doc = etree.parse(StringIO(document)) + xml_doc = etree.parse(BytesIO(document)) application = getApplicationForURI(uri) ns_dict = uri.node_selector.get_ns_bindings(application.default_ns) try: xpath = uri.node_selector.replace_default_prefix() attribute = xml_doc.xpath(xpath, namespaces = ns_dict) except Exception as ex: - ex.http_error = errors.ResourceNotFound() - raise + raise errors.ResourceNotFound if not attribute: raise errors.ResourceNotFound elif len(attribute) != 1: raise errors.ResourceNotFound('XPATH expression is ambiguous') # TODO # The server MUST NOT add namespace bindings representing namespaces # used by the element or its children, but declared in ancestor elements return StatusResponse(200, response.etag, attribute[0]) - def get_attribute(self, uri, check_etag): - d = self.get_document(uri, check_etag) - return d.addCallbacks(self._cb_get_attribute, callbackArgs=(uri, )) + async def get_attribute(self, uri, check_etag): + d = await self.get_document(uri, check_etag) + return self._cb_get_attribute(d, uri) - def _cb_delete_attribute(self, response, uri, check_etag): + async def _cb_delete_attribute(self, response, uri, check_etag): if response.code == 404: raise errors.ResourceNotFound document = response.data - xml_doc = etree.parse(StringIO(document)) + xml_doc = etree.parse(BytesIO(document)) application = getApplicationForURI(uri) ns_dict = uri.node_selector.get_ns_bindings(application.default_ns) try: elem = xml_doc.xpath(uri.node_selector.replace_default_prefix(append_terminal=False),namespaces=ns_dict) except Exception as ex: - ex.http_error = errors.ResourceNotFound() - raise + raise errors.ResourceNotFound if not elem: raise errors.ResourceNotFound if len(elem) != 1: raise errors.ResourceNotFound('XPATH expression is ambiguous') elem = elem[0] attribute = uri.node_selector.terminal_selector.attribute if elem.get(attribute): ## check if the attribute exists XXX use KeyError instead del elem.attrib[attribute] else: raise errors.ResourceNotFound new_document = etree.tostring(xml_doc, encoding='UTF-8', xml_declaration=True) - return self.put_document(uri, new_document, check_etag) + return await self.put_document(uri, new_document, check_etag) - def delete_attribute(self, uri, check_etag): - d = self.get_document(uri, check_etag) - return d.addCallbacks(self._cb_delete_attribute, callbackArgs=(uri, check_etag)) + async def delete_attribute(self, uri, check_etag): + d = await self.get_document(uri, check_etag) + return await self._cb_delete_attribute(d, uri, check_etag) - def _cb_put_attribute(self, response, uri, attribute, check_etag): + async def _cb_put_attribute(self, response, uri, attribute, check_etag): """This is called when the document that relates to the element is retrieved.""" if response.code == 404: raise errors.NoParentError document = response.data - xml_doc = etree.parse(StringIO(document)) + xml_doc = etree.parse(BytesIO(document)) application = getApplicationForURI(uri) ns_dict = uri.node_selector.get_ns_bindings(application.default_ns) try: elem = xml_doc.xpath(uri.node_selector.replace_default_prefix(append_terminal=False),namespaces=ns_dict) except Exception as ex: - ex.http_error = errors.NoParentError() - raise + raise errors.NoParentError if not elem: raise errors.NoParentError if len(elem) != 1: raise errors.NoParentError('XPATH expression is ambiguous') elem = elem[0] attr_name = uri.node_selector.terminal_selector.attribute elem.set(attr_name, attribute) new_document = etree.tostring(xml_doc, encoding='UTF-8', xml_declaration=True) - return self.put_document(uri, new_document, check_etag) + return await self.put_document(uri, new_document, check_etag) - def put_attribute(self, uri, attribute, check_etag): + async def put_attribute(self, uri, attribute, check_etag): ## TODO verify if the attribute is valid - d = self.get_document(uri, check_etag) - return d.addCallbacks(self._cb_put_attribute, callbackArgs=(uri, attribute, check_etag)) + d = await self.get_document(uri, check_etag) + return await self._cb_put_attribute(d, uri, attribute, check_etag) ## Namespace Bindings def _cb_get_ns_bindings(self, response, uri): """This is called when the document that relates to the element is retrieved.""" if response.code == 404: raise errors.ResourceNotFound document = response.data - xml_doc = etree.parse(StringIO(document)) + xml_doc = etree.parse(BytesIO(document)) application = getApplicationForURI(uri) ns_dict = uri.node_selector.get_ns_bindings(application.default_ns) try: elem = xml_doc.xpath(uri.node_selector.replace_default_prefix(append_terminal=False),namespaces=ns_dict) except Exception as ex: - ex.http_error = errors.ResourceNotFound() - raise + raise errors.ResourceNotFound if not elem: raise errors.ResourceNotFound elif len(elem)!=1: raise errors.ResourceNotFound('XPATH expression is ambiguous') elem = elem[0] namespaces = '' for prefix, ns in list(elem.nsmap.items()): namespaces += ' xmlns%s="%s"' % (prefix and ':%s' % prefix or '', ns) result = '<%s %s/>' % (elem.tag, namespaces) return StatusResponse(200, response.etag, result) - def get_ns_bindings(self, uri, check_etag): - d = self.get_document(uri, check_etag) - return d.addCallbacks(self._cb_get_ns_bindings, callbackArgs=(uri, )) + async def get_ns_bindings(self, uri, check_etag): + d = await self.get_document(uri, check_etag) + return self._cb_get_ns_bindings(d, uri) from xcap.appusage.capabilities import XCAPCapabilitiesApplication from xcap.appusage.dialogrules import DialogRulesApplication from xcap.appusage.directory import XCAPDirectoryApplication from xcap.appusage.pidf import PIDFManipulationApplication from xcap.appusage.prescontent import PresContentApplication from xcap.appusage.presrules import PresenceRulesApplication from xcap.appusage.purge import PurgeApplication from xcap.appusage.resourcelists import ResourceListsApplication from xcap.appusage.rlsservices import RLSServicesApplication from xcap.appusage.test import TestApplication from xcap.appusage.watchers import WatchersApplication storage = ServerConfig.backend.Storage() applications = { DialogRulesApplication.id: DialogRulesApplication(storage), PIDFManipulationApplication.id: PIDFManipulationApplication(storage), PresenceRulesApplication.id: PresenceRulesApplication(storage), PresenceRulesApplication.oma_id: PresenceRulesApplication(storage), PurgeApplication.id: PurgeApplication(storage), ResourceListsApplication.id: ResourceListsApplication(storage), RLSServicesApplication.id: RLSServicesApplication(storage), TestApplication.id: TestApplication(storage), WatchersApplication.id: WatchersApplication(storage), XCAPCapabilitiesApplication.id: XCAPCapabilitiesApplication(), XCAPDirectoryApplication.id: XCAPDirectoryApplication(storage) } # public GET applications (GET is not challenged for auth) public_get_applications = {PresContentApplication.id: PresContentApplication(storage)} applications.update(public_get_applications) for application in ServerConfig.disabled_applications: applications.pop(application, None) namespaces = dict((k, v.default_ns) for (k, v) in list(applications.items())) def getApplicationForURI(xcap_uri): return applications.get(xcap_uri.application_id, None) __all__ = ['applications', 'namespaces', 'public_get_applications', 'getApplicationForURI', 'ApplicationUsage', 'Backend'] - - diff --git a/xcap/appusage/capabilities.py b/xcap/appusage/capabilities.py index 9622426..1200d58 100644 --- a/xcap/appusage/capabilities.py +++ b/xcap/appusage/capabilities.py @@ -1,44 +1,44 @@ from lxml import etree -from twisted.internet import defer +#from twisted.internet import defer from xcap import errors from xcap.appusage import ApplicationUsage from xcap.dbutil import make_etag from xcap.backend import StatusResponse class XCAPCapabilitiesApplication(ApplicationUsage): ## RFC 4825 id = "xcap-caps" default_ns = "urn:ietf:params:xml:ns:xcap-caps" - mime_type= "application/xcap-caps+xml" + mime_type = "application/xcap-caps+xml" def __init__(self): pass - def _get_document(self): + async def _get_document(self): if hasattr(self, 'doc'): return self.doc, self.etag root = etree.Element("xcap-caps", nsmap={None: self.default_ns}) auids = etree.SubElement(root, "auids") extensions = etree.SubElement(root, "extensions") namespaces = etree.SubElement(root, "namespaces") from xcap.appusage import applications for (id, app) in list(applications.items()): etree.SubElement(auids, "auid").text = id etree.SubElement(namespaces, "namespace").text = app.default_ns self.doc = etree.tostring(root, encoding="UTF-8", pretty_print=True, xml_declaration=True) self.etag = make_etag('xcap-caps', self.doc) return self.doc, self.etag - def get_document_global(self, uri, check_etag): - doc, etag = self._get_document() - return defer.succeed(StatusResponse(200, etag=etag, data=doc)) + async def get_document_global(self, uri, check_etag): + doc, etag = await self._get_document() + return StatusResponse(code=200, etag=etag, data=doc) def get_document_local(self, uri, check_etag): self._not_implemented('users') def put_document(self, uri, document, check_etag): raise errors.ResourceNotFound("This application does not support PUT method") diff --git a/xcap/appusage/directory.py b/xcap/appusage/directory.py index 72947be..b44dca5 100644 --- a/xcap/appusage/directory.py +++ b/xcap/appusage/directory.py @@ -1,39 +1,39 @@ from lxml import etree -from twisted.internet import defer +# from twisted.internet import defer from xcap import errors from xcap.appusage import ApplicationUsage from xcap.backend import StatusResponse class XCAPDirectoryApplication(ApplicationUsage): id = "org.openmobilealliance.xcap-directory" default_ns = "urn:oma:xml:xdm:xcap-directory" mime_type= "application/vnd.oma.xcap-directory+xml" schema_file = "xcap-directory.xsd" def _docs_to_xml(self, docs, uri): sip_uri = "sip:%s@%s" % (uri.user.username, uri.user.domain) root = etree.Element("xcap-directory", nsmap={None: self.default_ns}) if docs: for k, v in docs.items(): folder = etree.SubElement(root, "folder", attrib={'auid': k}) for item in v: # We may have more than one document for the same application entry_uri = "%s/%s/users/%s/%s" % (uri.xcap_root, k, sip_uri, item[0]) entry = etree.SubElement(folder, "entry") entry.set("uri", entry_uri) entry.set("etag", item[1]) doc = etree.tostring(root, encoding="UTF-8", pretty_print=True, xml_declaration=True) #self.validate_document(doc) - return defer.succeed(StatusResponse(200, etag=None, data=doc)) + return StatusResponse(200, etag=None, data=doc) def get_document_local(self, uri, check_etag): docs_def = self.storage.get_documents_list(uri) docs_def.addCallback(self._docs_to_xml, uri) return docs_def def put_document(self, uri, document, check_etag): raise errors.ResourceNotFound("This application does not support PUT method") diff --git a/xcap/appusage/prescontent.py b/xcap/appusage/prescontent.py index f0090c5..755ebdb 100644 --- a/xcap/appusage/prescontent.py +++ b/xcap/appusage/prescontent.py @@ -1,49 +1,49 @@ -from io import StringIO +from io import BytesIO from lxml import etree from xcap import errors from xcap.appusage import ApplicationUsage class PresContentApplication(ApplicationUsage): id = "org.openmobilealliance.pres-content" default_ns = "urn:oma:xml:prs:pres-content" mime_type = "application/vnd.oma.pres-content+xml" icon_mime_types = ('image/jpeg', 'image/gif', 'image/png') icon_encoding = 'base64' icon_max_size = 300*1024 def _validate_icon(self, document): mime_type = None encoding = None data = None - xml = StringIO(document) + xml = BytesIO(document) try: tree = etree.parse(xml) root = tree.getroot() ns = root.nsmap[None] for element in root: if element.tag == "{%s}mime-type" % ns: mime_type = element.text.lower() if element.tag == "{%s}encoding" % ns: encoding = element.text.lower() if element.tag == "{%s}data" % ns: data = element.text except etree.ParseError: raise errors.NotWellFormedError() else: if mime_type not in self.icon_mime_types: raise errors.ConstraintFailureError(phrase="Unsupported MIME type. Allowed MIME types: %s" % ','.join(self.icon_mime_types)) if encoding != self.icon_encoding: raise errors.ConstraintFailureError(phrase="Unsupported encoding. Allowed enconding: %s" % self.icon_encoding) if data is None: raise errors.ConstraintFailureError(phrase="No icon data was provided") if len(data) > self.icon_max_size: raise errors.ConstraintFailureError(phrase="Size limit exceeded, maximum allowed size is %d bytes" % self.icon_max_size) def put_document(self, uri, document, check_etag): if uri.doc_selector.document_path.startswith('oma_status-icon'): self._validate_icon(document) return self.storage.put_document(uri, document, check_etag) diff --git a/xcap/appusage/presrules.py b/xcap/appusage/presrules.py index e8d486a..83bd881 100644 --- a/xcap/appusage/presrules.py +++ b/xcap/appusage/presrules.py @@ -1,116 +1,100 @@ -from application.configuration import ConfigSection, ConfigSetting -from io import StringIO +from io import BytesIO from lxml import etree from urllib.parse import unquote -import xcap from xcap import errors from xcap.appusage import ApplicationUsage -from xcap.datatypes import XCAPRootURI +from xcap.configuration import ServerConfig, AuthenticationConfig from xcap.uri import XCAPUri from xcap.xpath import DocumentSelectorError, NodeParsingError -class AuthenticationConfig(ConfigSection): - __cfgfile__ = xcap.__cfgfile__ - __section__ = 'Authentication' - - default_realm = ConfigSetting(type=str, value=None) - -class ServerConfig(ConfigSection): - __cfgfile__ = xcap.__cfgfile__ - __section__ = 'Server' - - allow_external_references = False - root = ConfigSetting(type=XCAPRootURI, value=None) - - def parseExternalListURI(node_uri, default_realm): from xcap.appusage import namespaces xcap_root = None for uri in ServerConfig.root.uris: if node_uri.startswith(uri): xcap_root = uri break if xcap_root is None: raise errors.ConstraintFailureError("XCAP root not found for URI: %s" % node_uri) resource_selector = node_uri[len(xcap_root):] if not resource_selector or resource_selector == '/': raise errors.ConstraintFailureError("Resource selector missing") try: uri = XCAPUri(xcap_root, resource_selector, namespaces) except (DocumentSelectorError, NodeParsingError) as e: raise errors.ConstraintFailureError(phrase=str(e)) else: if uri.user.domain is None: uri.user.domain = default_realm return uri class PresenceRulesApplication(ApplicationUsage): id = "pres-rules" oma_id = "org.openmobilealliance.pres-rules" default_ns = "urn:ietf:params:xml:ns:pres-rules" mime_type = "application/auth-policy+xml" schema_file = 'presence-rules.xsd' def _check_external_list(self, external_list, node_uri): if not external_list: return external_list = unquote(external_list) external_list_uri = parseExternalListURI(external_list, AuthenticationConfig.default_realm) if external_list_uri.xcap_root != node_uri.xcap_root: raise errors.ConstraintFailureError(phrase="XCAP root in the external list doesn't match PUT requests'") if external_list_uri.user != node_uri.user: raise errors.ConstraintFailureError(phrase="Cannot link to another user's list") def _validate_rules(self, document, node_uri): common_policy_namespace = 'urn:ietf:params:xml:ns:common-policy' oma_namespace = 'urn:oma:xml:xdm:common-policy' actions_tag = '{%s}actions' % common_policy_namespace conditions_tag = '{%s}conditions' % common_policy_namespace identity_tag = '{%s}identity' % common_policy_namespace rule_tag = '{%s}rule' % common_policy_namespace transformations_tag = '{%s}transformations' % common_policy_namespace sub_handling_tag = '{%s}sub-handling' % self.default_ns oma_anonymous_request_tag = '{%s}anonymous-request' % oma_namespace oma_entry_tag = '{%s}entry' % oma_namespace oma_external_list_tag = '{%s}external-list' % oma_namespace oma_other_identity_tag = '{%s}other-identity' % oma_namespace try: - xml = StringIO(document) + xml = BytesIO(document) tree = etree.parse(xml) root = tree.getroot() if oma_namespace in list(root.nsmap.values()): # Condition constraints for element in root.iter(conditions_tag): if any([len(element.findall(item)) > 1 for item in (identity_tag, oma_external_list_tag, oma_other_identity_tag, oma_anonymous_request_tag)]): raise errors.ConstraintFailureError(phrase="Complex rules are not allowed") # Transformations constraints for rule in root.iter(rule_tag): actions = rule.find(actions_tag) if actions is not None: sub_handling = actions.find(sub_handling_tag) transformations = rule.find(transformations_tag) if sub_handling is not None and sub_handling.text != 'allow' and transformations is not None and transformations.getchildren(): raise errors.ConstraintFailureError(phrase="transformations element not allowed") # External list constraints if not ServerConfig.allow_external_references: for element in root.iter(oma_external_list_tag): for entry in element.iter(oma_entry_tag): self._check_external_list(entry.attrib.get('anc', None), node_uri) except etree.ParseError: raise errors.NotWellFormedError() def put_document(self, uri, document, check_etag): self.validate_document(document) self._validate_rules(document, uri) return self.storage.put_document(uri, document, check_etag) diff --git a/xcap/appusage/resourcelists.py b/xcap/appusage/resourcelists.py index 02bca4d..fd33937 100644 --- a/xcap/appusage/resourcelists.py +++ b/xcap/appusage/resourcelists.py @@ -1,149 +1,134 @@ -from application.configuration import ConfigSection, ConfigSetting -from io import StringIO +from io import BytesIO from lxml import etree from urllib.parse import unquote from urllib.parse import urlparse -import xcap from xcap import errors from xcap.appusage import ApplicationUsage -from xcap.datatypes import XCAPRootURI +from xcap.configuration import ServerConfig, AuthenticationConfig from xcap.uri import XCAPUri from xcap.xpath import DocumentSelectorError, NodeParsingError -class AuthenticationConfig(ConfigSection): - __cfgfile__ = xcap.__cfgfile__ - __section__ = 'Authentication' - - default_realm = ConfigSetting(type=str, value=None) - -class ServerConfig(ConfigSection): - __cfgfile__ = xcap.__cfgfile__ - __section__ = 'Server' - - allow_external_references = False - root = ConfigSetting(type=XCAPRootURI, value=None) - def parseExternalListURI(node_uri, default_realm): from xcap.appusage import namespaces xcap_root = None for uri in ServerConfig.root.uris: if node_uri.startswith(uri): xcap_root = uri break if xcap_root is None: raise errors.ConstraintFailureError("XCAP root not found for URI: %s" % node_uri) resource_selector = node_uri[len(xcap_root):] if not resource_selector or resource_selector == '/': raise errors.ConstraintFailureError("Resource selector missing") try: uri = XCAPUri(xcap_root, resource_selector, namespaces) except (DocumentSelectorError, NodeParsingError) as e: raise errors.ConstraintFailureError(phrase=str(e)) else: if uri.user.domain is None: uri.user.domain = default_realm return uri def get_xpath(elem): """Return XPATH expression to obtain elem in the document. This could be done better, of course, not using stars, but the real tags. But that would be much more complicated and I'm not sure if such effort is justified""" res = '' while elem is not None: parent = elem.getparent() if parent is None: res = '/*' + res else: res = '/*[%s]' % parent.index(elem) + res elem = parent return res def attribute_not_unique(elem, attr): raise errors.UniquenessFailureError(exists = get_xpath(elem) + '/@' + attr) class ResourceListsApplication(ApplicationUsage): # RFC 4826 id = "resource-lists" default_ns = "urn:ietf:params:xml:ns:resource-lists" mime_type= "application/resource-lists+xml" schema_file = 'resource-lists.xsd' @classmethod def check_list(cls, element, node_uri): - from xcap.authentication import parseNodeURI + from xcap.authentication.auth import parseNodeURI entry_tag = "{%s}entry" % cls.default_ns entry_ref_tag = "{%s}entry-ref" % cls.default_ns external_tag ="{%s}external" % cls.default_ns list_tag = "{%s}list" % cls.default_ns anchor_attrs = set() name_attrs = set() ref_attrs = set() uri_attrs = set() for child in element.getchildren(): if child.tag == list_tag: name = child.get("name") if name in name_attrs: attribute_not_unique(child, 'name') else: name_attrs.add(name) cls.check_list(child, node_uri) elif child.tag == entry_tag: uri = child.get("uri") if uri in uri_attrs: attribute_not_unique(child, 'uri') else: uri_attrs.add(uri) elif child.tag == entry_ref_tag: ref = child.get("ref") if ref in ref_attrs: attribute_not_unique(child, 'ref') else: try: ref = unquote(ref) ref_uri = parseNodeURI("%s/%s" % (node_uri.xcap_root, ref), AuthenticationConfig.default_realm) if not ServerConfig.allow_external_references and ref_uri.user != node_uri.user: raise errors.ConstraintFailureError(phrase="Cannot link to another users' list") try: if ref_uri.node_selector.element_selector[-1].name[1] != "entry": raise ValueError except LookupError: raise ValueError except (DocumentSelectorError, NodeParsingError) as e: raise errors.ConstraintFailureError(phrase=str(e)) except ValueError: raise errors.ConstraintFailureError else: ref_attrs.add(ref) elif child.tag == external_tag: anchor = child.get("anchor") if anchor in anchor_attrs: attribute_not_unique(child, 'anchor') else: anchor = unquote(anchor) if not ServerConfig.allow_external_references: external_list_uri = parseExternalListURI(anchor, AuthenticationConfig.default_realm) if external_list_uri.xcap_root != node_uri.xcap_root: raise errors.ConstraintFailureError(phrase="XCAP root in the external list doesn't match PUT requests'") if external_list_uri.user != node_uri.user: raise errors.ConstraintFailureError(phrase="Cannot link to another users' list") else: parsed_url = urlparse(anchor) if parsed_url.scheme not in ('http', 'https'): raise errors.ConstraintFailureError(phrase='Specified anchor is not a valid URL') else: anchor_attrs.add(anchor) def put_document(self, uri, document, check_etag): self.validate_document(document) # Check additional constraints (see section 3.4.5 of RFC 4826) - xml_doc = etree.parse(StringIO(document)) + xml_doc = etree.parse(BytesIO(document)) self.check_list(xml_doc.getroot(), uri) return self.storage.put_document(uri, document, check_etag) diff --git a/xcap/appusage/watchers.py b/xcap/appusage/watchers.py index 704830c..fd98819 100644 --- a/xcap/appusage/watchers.py +++ b/xcap/appusage/watchers.py @@ -1,35 +1,35 @@ from lxml import etree from xcap import errors from xcap.appusage import ApplicationUsage from xcap.dbutil import make_etag from xcap.backend import StatusResponse class WatchersApplication(ApplicationUsage): id = "org.openxcap.watchers" default_ns = "http://openxcap.org/ns/watchers" - mime_type= "application/xml" + mime_type = "application/xml" schema_file = 'watchers.xsd' # who needs schema for readonly application? def _watchers_to_xml(self, watchers, uri, check_etag): root = etree.Element("watchers", nsmap={None: self.default_ns}) for watcher in watchers: watcher_elem = etree.SubElement(root, "watcher") for name, value in watcher.items(): etree.SubElement(watcher_elem, name).text = value doc = etree.tostring(root, encoding="utf-8", pretty_print=True, xml_declaration=True) #self.validate_document(doc) etag = make_etag(uri, doc) check_etag(etag) return StatusResponse(200, data=doc, etag=etag) - def get_document_local(self, uri, check_etag): - watchers_def = self.storage.get_watchers(uri) - watchers_def.addCallback(self._watchers_to_xml, uri, check_etag) - return watchers_def + async def get_document_local(self, uri, check_etag): + watchers_def = await self.storage.get_watchers(uri) + return self._watchers_to_xml(watchers_def, uri, check_etag) + # return watchers_def def put_document(self, uri, document, check_etag): raise errors.ResourceNotFound("This application does not support PUT method")