diff --git a/xcap/xpath.py b/xcap/xpath.py
index acb909b..0609573 100644
--- a/xcap/xpath.py
+++ b/xcap/xpath.py
@@ -1,375 +1,400 @@
import re
from application import log
from copy import copy
from lxml import _elementpath as ElementPath
from xml.sax.saxutils import quoteattr
__all__ = ['parse_node_selector', 'AttributeSelector', 'DocumentSelector', 'ElementSelector', 'NamespaceSelector', 'NodeSelector']
# Errors
+
class Error(ValueError): pass
+
class NodeParsingError(Error):
- http_error = 400
+ status_code = 400
+
class DocumentSelectorError(Error):
- http_error = 404
+ status_code = 404
# XPath tokenizer
class List(list):
def get(self, index, default=None):
try:
return self[index]
except LookupError:
return default
+
class Op(str):
tag = False
+
class Tag(str):
tag = True
+
class XPathTokenizer(object):
@classmethod
- def tokens(cls, selector):
+ def tokens(cls, selector, namespaces={}):
"""
>>> xpath_tokenizer('resource-lists')
['resource-lists']
>>> xpath_tokenizer('list[@name="friends"]')
['list', '[', '@', 'name', '=', 'friends', ']']
We cannot properly tokenize an URI like this :(
>>> uri_ugly = 'external[@anchor="http://xcap.example.org/resource-lists/users/sip:a@example.org/index/~~/resource-lists/list[@name="mkting"]"]'
>>> len(xpath_tokenizer(uri_ugly)) # expected 7
10
To feed such URI to this function, replace quote \" with "
>>> uri_nice = 'external[@anchor="http://xcap.example.org/resource-lists/users/sip:a@example.org/index/~~/resource-lists/list[@name="mkting"]"]'
>>> len(xpath_tokenizer(uri_nice)) # expected 7
7
"""
def unquote_attr_value(s):
# XXX currently equivalent but differently encoded URIs won't be considered equal (", etc.)
if len(s) > 1 and s[0] == s[-1] and s[0] in '"\'':
return s[1:-1]
raise NodeParsingError
+ if 'namespace::*' in selector:
+ if 'namespace' not in namespaces:
+ namespaces['namespace'] = 'namespace' # or another relevant URI for namespace
tokens = List()
prev = None
- for op, tag in ElementPath.xpath_tokenizer(selector):
- if prev == '=':
- unq = unquote_attr_value
- else:
- unq = lambda x:x
- if op:
- x = Op(unq(op))
- else:
- x = Tag(unq(tag))
- tokens.append(x)
- prev = x
+ try:
+ for op, tag in ElementPath.xpath_tokenizer(selector, namespaces):
+ if 'namespace::' in tag:
+ tokens.append(NamespaceSelector()) # Mapping namespace::* to NamespaceSelector
+ else:
+ if prev == '=':
+ unq = unquote_attr_value
+ else:
+ unq = lambda x: x
+ if op:
+ x = Op(unq(op))
+ else:
+ x = Tag(unq(tag))
+ tokens.append(x)
+ prev = x
+ except SyntaxError as e:
+ raise NodeParsingError(e)
return tokens
# XPath parsing
def read_element_tag(lst, index, namespace, namespaces):
if index == len(lst):
raise NodeParsingError
elif lst[index] == '*':
- return '*', index+1
- elif lst.get(index+1) == ':':
+ return '*', index + 1
+ elif lst.get(index + 1) == ':':
if not lst[index].tag:
raise NodeParsingError
- if not lst.get(index+2) or not lst.get(index+2).tag:
+ if not lst.get(index + 2) or not lst.get(index + 2).tag:
raise NodeParsingError
try:
namespaces[lst[index]]
except LookupError:
raise NodeParsingError
- return (namespaces[lst[index]], lst[index+2]), index+3
+ return (namespaces[lst[index]], lst[index + 2]), index + 3
else:
- return (namespace, lst[index]), index+1
+ return (namespace, lst[index]), index + 1
+
def read_position(lst, index):
- if lst.get(index) == '[' and lst.get(index+2) == ']':
- return int(lst[index+1]), index+3
+ if lst.get(index) == '[' and lst.get(index + 2) == ']':
+ return int(lst[index + 1]), index + 3
return None, index
+
# XML attributes don't belong to the same namespace as containing tag
def read_att_test(lst, index, _namespace, namespaces):
- if lst.get(index) == '[' and lst.get(index+1) == '@' and lst.get(index+3) == '=' and lst.get(index+5) == ']':
- return (None, lst[index+2]), lst[index+4], index+6
- elif lst.get(index) == '[' and lst.get(index+1) == '@' and lst.get(index+3) == ':' and lst.get(index+5) == '=' and lst.get(index+7) == ']':
- return (namespaces[lst[index+2]], lst[index+4]), lst[index+6], index+8
+ if lst.get(index) == '[' and lst.get(index + 1) == '@' and lst.get(index + 3) == '=' and lst.get(index + 5) == ']':
+ return (None, lst[index + 2]), lst[index + 4], index + 6
+ elif lst.get(index) == '[' and lst.get(index + 1) == '@' and lst.get(index + 3) == ':' and lst.get(index + 5) == '=' and lst.get(index + 7) == ']':
+ return (namespaces[lst[index + 2]], lst[index + 4]), lst[index + 6], index + 8
return None, None, index
+
class Step(object):
def __init__(self, name, position=None, att_name=None, att_value=None):
self.name = name
self.position = position
self.att_name = att_name
self.att_value = att_value
def to_string(self, ns_prefix_mapping=dict()):
try:
namespace, name = self.name
except ValueError:
res = self.name
else:
prefix = ns_prefix_mapping[namespace]
if prefix:
res = prefix + ':' + name
else:
res = name
if self.position is not None:
res += '[%s]' % self.position
if self.att_name is not None:
namespace, name = self.att_name
if namespace:
prefix = ns_prefix_mapping[namespace]
else:
prefix = None
if prefix:
res += '[@%s:%s=%s]' % (prefix, name, quoteattr(self.att_value))
else:
res += '[@%s=%s]' % (name, quoteattr(self.att_value))
return res
def __repr__(self):
args = [self.name, self.position, self.att_name, self.att_value]
while args and args[-1] is None:
del args[-1]
args = [repr(x) for x in args]
return 'Step(%s)' % ', '.join(args)
+
def read_step(lst, index, namespace, namespaces):
if lst.get(index) == '@':
- return AttributeSelector(lst[index+1]), index+2
- elif lst.get(index) == 'namespace' and lst.get(index+1) == '::' and lst.get(index+2) == '*':
- return NamespaceSelector(), index+3
+ return AttributeSelector(lst[index + 1]), index + 2
+ elif lst.get(index) == '{namespace}:*':
+ return NamespaceSelector(), index + 1
else:
tag, index = read_element_tag(lst, index, namespace, namespaces)
position, index = read_position(lst, index)
att_name, att_value, index = read_att_test(lst, index, namespace, namespaces)
return Step(tag, position, att_name, att_value), index
+
def read_slash(lst, index):
if lst.get(index) == '/':
- return index+1
+ return index + 1
raise NodeParsingError
+
def read_node_selector(lst, namespace, namespaces):
index = 0
if lst.get(0) == '/':
index = read_slash(lst, index)
steps = []
terminal_selector = None
while True:
step, index = read_step(lst, index, namespace, namespaces)
if isinstance(step, TerminalSelector):
if index != len(lst):
raise NodeParsingError
terminal_selector = step
break
steps.append(step)
if index == len(lst):
break
index = read_slash(lst, index)
return ElementSelector(steps, namespace, namespaces), terminal_selector
+
def parse_node_selector(selector, namespace=None, namespaces=dict()):
"""
>>> parse_node_selector('/resource-lists', None, {})
([Step((None, 'resource-lists'))], None)
>>> parse_node_selector('/resource-lists/list[1]/entry[@uri="sip:bob@example.com"]', None, {})
([Step((None, 'resource-lists')), Step((None, 'list'), 1), Step((None, 'entry'), None, (None, 'uri'), 'sip:bob@example.com')], None)
>>> parse_node_selector('/*/list[1][@name="friends"]/@name')
- ([Step('*'), Step((None, 'list'), 1, (None, 'name'), 'friends')], AttributeSelector('name'))
+([Step('*'), Step((None, 'list'), 1, (None, 'name'), 'friends')], AttributeSelector('name'))
>>> parse_node_selector('/*[10][@att="val"]/namespace::*')
([Step('*', 10, (None, 'att'), 'val')], NamespaceSelector())
>>> x = parse_node_selector('/resource-lists/list[@name="friends"]/external[@anchor="http://xcap.example.org/resource-lists/users/sip:a@example.org/index/~~/resource-lists/list%5b@name=%22mkting%22%5d"]')
"""
try:
- tokens = XPathTokenizer.tokens(selector)
+ tokens = XPathTokenizer.tokens(selector, namespaces)
element_selector, terminal_selector = read_node_selector(tokens, namespace, namespaces)
element_selector._original_selector = selector
return element_selector, terminal_selector
except NodeParsingError as e:
e.args = ('Failed to parse node: %r' % selector,)
raise
except Exception:
log.error('internal error in parse_node_selector(%r)' % selector)
raise
# XPath selectors
class TerminalSelector(object):
pass
+
class AttributeSelector(TerminalSelector):
def __init__(self, attribute):
self.attribute = attribute
def __str__(self):
return '@' + self.attribute
def __repr__(self):
return 'AttributeSelector(%r)' % self.attribute
+
class DocumentSelector(str):
"""Constructs a DocumentSelector containing the application_id, context, user_id
and document from the given selector string.
>>> x = DocumentSelector('/resource-lists/users/sip:joe@example.com/index')
>>> x.application_id, x.context, x.user_id, x.document_path
('resource-lists', 'users', 'sip:joe@example.com', 'index')
>>> x = DocumentSelector('/rls-services/global/index')
>>> x.application_id, x.context, x.user_id, x.document_path
('rls-services', 'global', None, 'index')
"""
def __init__(self, selector):
if selector[:1] == '/':
selector = selector[1:]
else:
raise DocumentSelectorError("Document selector does not start with /")
if selector[-1:] == '/':
selector = selector[:-1]
if not selector:
raise DocumentSelectorError("Document selector does not contain auid")
- segments = selector.split('/')
+ segments = selector.split('/')
if len(segments) < 2:
raise DocumentSelectorError("Document selector does not contain context: %r" % selector)
self.application_id = segments[0]
self.context = segments[1]
if self.context not in ("users", "global"):
- raise DocumentSelectorError("Document selector context must be either 'users' or 'global', not %r: %r" % \
+ raise DocumentSelectorError("Document selector context must be either 'users' or 'global', not %r: %r" %
(self.context, selector))
self.user_id = None
if self.context == "users":
try:
self.user_id = segments[2]
except IndexError:
raise DocumentSelectorError('Document selector does not contain user id: %r' % selector)
segments = segments[3:]
else:
segments = segments[2:]
if not segments:
raise DocumentSelectorError("Document selector does not contain document's path: %r" % selector)
self.document_path = '/'.join(segments)
def __repr__(self):
return '%s(%s)' % (self.__class__.__name__, str.__repr__(self))
+
class ElementSelector(list):
- XML_TAG_REGEXP = re.compile('\s*<([^ >/]+)')
+ XML_TAG_REGEXP = re.compile(r'\s*<([^ >/]+)')
def __init__(self, lst, namespace, namespaces):
list.__init__(self, lst)
self.namespace = namespace
self.namespaces = namespaces
def _parse_qname(self, qname):
if qname == '*':
return qname
try:
prefix, name = qname.split(':')
except ValueError:
return (self.namespace, qname)
else:
return self.namespaces[prefix], name
def replace_default_prefix(self, ns_prefix_mapping):
steps = []
for step in self:
try:
namespace, name = step.name
except ValueError:
steps.append(str(step))
else:
steps.append(step.to_string(ns_prefix_mapping))
return '/' + '/'.join(steps)
def fix_star(self, element_body):
"""
>>> elem_selector = parse_node_selector('/watcherinfo/watcher-list/*[@id="8ajksjda7s"]', None, {})[0]
>>> elem_selector.fix_star('')[-1].name[1]
'watcher'
"""
if self and self[-1].name == '*' and self[-1].position is None:
- m = self.XML_TAG_REGEXP.match(element_body)
+ m = self.XML_TAG_REGEXP.match(element_body.decode())
if m:
(name, ) = m.groups()
result = copy(self)
result[-1].name = self._parse_qname(name)
return result
return self
+
class NamespaceSelector(TerminalSelector):
def __str__(self):
return "namespace::*"
def __repr__(self):
return 'NamespaceSelector()'
+
class NodeSelector(object):
- XMLNS_REGEXP = re.compile("xmlns\((?P.*?)\)")
+ XMLNS_REGEXP = re.compile(r"xmlns\((?P.*?)\)")
XPATH_DEFAULT_PREFIX = 'default'
def __init__(self, selector, namespace=None):
self._original_selector = selector
sections = selector.split('?', 1)
if len(sections) == 2:
self.ns_bindings = self._parse_ns_bindings(sections[1])
else:
self.ns_bindings = {}
self.element_selector, self.terminal_selector = parse_node_selector(sections[0], namespace, self.ns_bindings)
def __str__(self):
return self._original_selector
# http://www.w3.org/TR/2003/REC-xptr-xmlns-20030325/
def _parse_ns_bindings(self, query):
ns_bindings = {}
ns_matches = self.XMLNS_REGEXP.findall(query)
for m in ns_matches:
try:
prefix, ns = m.split('=')
ns_bindings[prefix] = ns
except ValueError:
log.error("Ignoring invalid XPointer XMLNS expression: %r" % m)
continue
return ns_bindings
def replace_default_prefix(self, defprefix=None, append_terminal=True):
if defprefix is None:
defprefix = self.XPATH_DEFAULT_PREFIX
namespace2prefix = dict((v, k) for (k, v) in self.ns_bindings.items())
namespace2prefix[self.element_selector.namespace] = defprefix
res = self.element_selector.replace_default_prefix(namespace2prefix)
if append_terminal and self.terminal_selector:
res += '/' + str(self.terminal_selector)
return res
def get_ns_bindings(self, default_ns):
ns_bindings = self.ns_bindings.copy()
ns_bindings[self.XPATH_DEFAULT_PREFIX] = default_ns
return ns_bindings
-
-