diff --git a/sipsimple/payloads/addressbook.py b/sipsimple/payloads/addressbook.py index 9489c01f..477d9174 100644 --- a/sipsimple/payloads/addressbook.py +++ b/sipsimple/payloads/addressbook.py @@ -1,279 +1,279 @@ # Copyright (C) 2012 AG Projects. See LICENSE for details. # """Addressbook related payload elements""" __all__ = ['namespace', 'Group', 'Contact', 'ContactURI', 'ContactURIList', 'ElementExtension', 'ElementAttributes'] from lxml import etree from sipsimple.payloads import XMLElement, XMLListElement, XMLStringElement, XMLBooleanElement, XMLElementID, XMLAttribute, XMLElementChild -from sipsimple.payloads.datatypes import AnyURI +from sipsimple.payloads.datatypes import AnyURI, ID from sipsimple.payloads.resourcelists import ResourceListsDocument, ListElement namespace = 'urn:ag-projects:xml:ns:addressbook' ResourceListsDocument.register_namespace(namespace, prefix='addressbook', schema='addressbook.xsd') class ElementExtension(object): pass class Name(XMLStringElement): _xml_tag = 'name' _xml_namespace = namespace _xml_document = ResourceListsDocument def __repr__(self): return '%s(%r)' % (self.__class__.__name__, self.value) class Group(XMLElement, ListElement): _xml_tag = 'group' _xml_namespace = namespace _xml_extension_type = ElementExtension _xml_document = ResourceListsDocument - id = XMLElementID('id', type=str, required=True, test_equal=True) + id = XMLElementID('id', type=ID, required=True, test_equal=True) name = XMLElementChild('name', type=Name, required=True, test_equal=True) def __init__(self, id, name): XMLElement.__init__(self) self.id = id self.name = name def __unicode__(self): return unicode(self.name) def __repr__(self): return '%s(%r, %r)' % (self.__class__.__name__, self.id, self.name) class ContactURI(XMLElement): _xml_tag = 'uri' _xml_namespace = namespace _xml_extension_type = ElementExtension _xml_document = ResourceListsDocument - id = XMLElementID('id', type=str, required=True, test_equal=True) + id = XMLElementID('id', type=ID, required=True, test_equal=True) uri = XMLAttribute('uri', type=AnyURI, required=True, test_equal=True) type = XMLAttribute('type', type=unicode, required=False, test_equal=True) def __init__(self, id, uri, type=None): XMLElement.__init__(self) self.id = id self.uri = uri self.type = type def __unicode__(self): return unicode(self.uri) def __repr__(self): return '%s(%r, %r, %r)' % (self.__class__.__name__, self.id, self.uri, self.type) class ContactURIList(XMLListElement): _xml_tag = 'uris' _xml_namespace = namespace _xml_document = ResourceListsDocument _xml_item_type = ContactURI def __init__(self, uris=[]): XMLListElement.__init__(self) self.update(uris) class PolicyValue(str): def __new__(cls, value): if value not in ('allow', 'block', 'ignore', 'confirm'): raise ValueError("Invalid policy value: %s" % value) return super(PolicyValue, cls).__new__(cls, value) class Policy(XMLStringElement): _xml_tag = 'policy' _xml_namespace = namespace _xml_document = ResourceListsDocument _xml_value_type = PolicyValue class Subscribe(XMLBooleanElement): _xml_tag = 'subscribe' _xml_namespace = namespace _xml_document = ResourceListsDocument class DialogHandling(XMLElement): _xml_tag = 'dialog' _xml_namespace = namespace _xml_document = ResourceListsDocument policy = XMLElementChild('policy', type=Policy, required=True, test_equal=True) subscribe = XMLElementChild('subscribe', type=Subscribe, required=True, test_equal=True) def __init__(self, policy, subscribe): XMLElement.__init__(self) self.policy = policy self.subscribe = subscribe def __repr__(self): return '%s(%r, %r)' % (self.__class__.__name__, self.policy, self.subscribe) class PresenceHandling(XMLElement): _xml_tag = 'presence' _xml_namespace = namespace _xml_document = ResourceListsDocument policy = XMLElementChild('policy', type=Policy, required=True, test_equal=True) subscribe = XMLElementChild('subscribe', type=Subscribe, required=True, test_equal=True) def __init__(self, policy, subscribe): XMLElement.__init__(self) self.policy = policy self.subscribe = subscribe def __repr__(self): return '%s(%r, %r)' % (self.__class__.__name__, self.policy, self.subscribe) class Contact(XMLElement, ListElement): _xml_tag = 'contact' _xml_namespace = namespace _xml_extension_type = ElementExtension _xml_document = ResourceListsDocument - id = XMLElementID('id', type=str, required=True, test_equal=True) + id = XMLElementID('id', type=ID, required=True, test_equal=True) group_id = XMLAttribute('group_id', type=str, required=True, test_equal=True) name = XMLElementChild('name', type=Name, required=True, test_equal=True) uris = XMLElementChild('uris', type=ContactURIList, required=True, test_equal=True) dialog = XMLElementChild('dialog', type=DialogHandling, required=True, test_equal=True) presence = XMLElementChild('presence', type=PresenceHandling, required=True, test_equal=True) def __init__(self, id, group_id, name, uris=[], presence_handling=None, dialog_handling=None): XMLElement.__init__(self) self.id = id self.group_id = group_id self.name = name self.uris = ContactURIList(uris) self.dialog = dialog_handling or DialogHandling('confirm', False) self.presence = presence_handling or PresenceHandling('confirm', False) def __repr__(self): return '%s(%r, %r, %r, %r, %r, %r)' % (self.__class__.__name__, self.id, self.group_id, self.name, list(self.uris), self.presence, self.dialog) # # Extensions # class ElementAttributes(XMLElement, ElementExtension): _xml_tag = 'attributes' _xml_namespace = 'urn:ag-projects:sipsimple:xml:ns:addressbook' _xml_document = ResourceListsDocument def __init__(self, attributes={}): XMLElement.__init__(self) self._attributes = dict() self.update(attributes) def _parse_element(self, element): self._attributes = dict() attribute_tag = '{%s}attribute' % self._xml_namespace for child in (child for child in element if child.tag == attribute_tag): if 'nil' in child.attrib: self[child.attrib['name']] = None else: self[child.attrib['name']] = unicode(child.text or u'') def _build_element(self): self.element.clear() attribute_tag = '{%s}attribute' % self._xml_namespace for key, value in self.iteritems(): child = etree.SubElement(self.element, attribute_tag, nsmap=self._xml_document.nsmap) child.attrib['name'] = key if value is None: child.attrib['nil'] = 'true' else: child.text = value def __contains__(self, key): return key in self._attributes def __iter__(self): return iter(self._attributes) def __getitem__(self, key): return self._attributes[key] def __setitem__(self, key, value): if self._attributes.get(key, None) == value: return self._attributes[key] = value self.__dirty__ = True def __delitem__(self, key): del self._attributes[key] self.__dirty__ = True def __repr__(self): return "%s(%r)" % (self.__class__.__name__, dict(self)) def clear(self): if self._attributes: self._attributes.clear() self.__dirty__ = True def get(self, key, default=None): return self._attributes.get(key, default) def has_key(self, key): return key in self._attributes def items(self): return self._attributes.items() def iteritems(self): return self._attributes.iteritems() def iterkeys(self): return self._attributes.iterkeys() def itervalues(self): return self._attributes.itervalues() def keys(self): return self._attributes.keys() def pop(self, key, *args): value = self._attributes.pop(key, *args) if not args or value is not args[0]: self.__dirty__ = True return value def popitem(self): value = self._attributes.popitem() self.__dirty__ = True return value def setdefault(self, key, default=None): value = self._attributes.setdefault(key, default) if value is default: self.__dirty__ = True return value def update(self, attributes=(), **kw): self._attributes.update(attributes, **kw) if attributes or kw: self.__dirty__ = True ResourceListsDocument.register_namespace(ElementAttributes._xml_namespace, prefix='sipsimple') Group.register_extension('attributes', ElementAttributes) Contact.register_extension('attributes', ElementAttributes) ContactURI.register_extension('attributes', ElementAttributes) diff --git a/sipsimple/payloads/datatypes.py b/sipsimple/payloads/datatypes.py index 189da414..57854917 100644 --- a/sipsimple/payloads/datatypes.py +++ b/sipsimple/payloads/datatypes.py @@ -1,229 +1,238 @@ # Copyright (C) 2008-2011 AG Projects. See LICENSE for details. # """Data types used for simple XML elements and for XML attributes""" __all__ = ['Boolean', 'DateTime', 'Byte', 'UnsignedByte', 'Short', 'UnsignedShort', 'Int', 'UnsignedInt', 'Long', 'UnsignedLong', 'PositiveInteger', 'NegativeInteger', 'NonNegativeInteger', 'NonPositiveInteger', 'AnyURI', 'SIPURI', 'XCAPURI'] import re import urllib import urlparse from sipsimple.util import Timestamp class Boolean(int): def __new__(cls, value): return int.__new__(cls, bool(value)) def __repr__(self): return 'True' if self else 'False' __str__ = __repr__ @classmethod def __xmlparse__(cls, value): if value in ('True', 'true'): return int.__new__(cls, 1) elif value in ('False', 'false'): return int.__new__(cls, 0) else: raise ValueError("Invalid boolean string representation: %s" % value) def __xmlbuild__(self): return u'true' if self else u'false' class DateTime(Timestamp): @classmethod def __xmlparse__(cls, value): return cls.parse(value) def __xmlbuild__(self): return self.format(self) class Byte(int): def __new__(cls, value): instance = int.__new__(cls, value) if not (-128 <= instance <= 127): raise ValueError("integer number must be a signed 8bit value") return instance class UnsignedByte(int): def __new__(cls, value): instance = int.__new__(cls, value) if not (0 <= instance <= 255): raise ValueError("integer number must be an unsigned 8bit value") return instance class Short(int): def __new__(cls, value): instance = int.__new__(cls, value) if not (-32768 <= instance <= 32767): raise ValueError("integer number must be a signed 16bit value") return instance class UnsignedShort(int): def __new__(cls, value): instance = int.__new__(cls, value) if not (0 <= instance <= 65535): raise ValueError("integer number must be an unsigned 16bit value") return instance class Int(long): def __new__(cls, value): instance = long.__new__(cls, value) if not (-2147483648 <= instance <= 2147483647): raise ValueError("integer number must be a signed 32bit value") return instance class UnsignedInt(long): def __new__(cls, value): instance = long.__new__(cls, value) if not (0 <= instance <= 4294967295): raise ValueError("integer number must be an unsigned 32bit value") return instance class Long(long): def __new__(cls, value): instance = long.__new__(cls, value) if not (-9223372036854775808 <= instance <= 9223372036854775807): raise ValueError("integer number must be a signed 64bit value") return instance class UnsignedLong(long): def __new__(cls, value): instance = long.__new__(cls, value) if not (0 <= instance <= 18446744073709551615): raise ValueError("integer number must be an unsigned 64bit value") return instance class PositiveInteger(long): def __new__(cls, value): instance = long.__new__(cls, value) if instance <= 0: raise ValueError("integer number must be a positive value") return instance class NegativeInteger(long): def __new__(cls, value): instance = long.__new__(cls, value) if instance >= 0: raise ValueError("integer number must be a negative value") return instance class NonNegativeInteger(long): def __new__(cls, value): instance = long.__new__(cls, value) if instance < 0: raise ValueError("integer number must be a non-negative value") return instance class NonPositiveInteger(long): def __new__(cls, value): instance = long.__new__(cls, value) if instance > 0: raise ValueError("integer number must be a non-positive value") return instance class AnyURI(unicode): @classmethod def __xmlparse__(cls, value): return cls.__new__(cls, urllib.unquote(value).decode('utf-8')) def __xmlbuild__(self): return urllib.quote(self.encode('utf-8')) class SIPURI(AnyURI): _path_regex = re.compile(r'^((?P[^:@]+)(:(?P[^@]+))?@)?(?P.*)$') def __new__(cls, value): instance = AnyURI.__new__(cls, value) uri = urlparse.urlparse(instance) if uri.scheme not in ('sip', 'sips'): raise ValueError("illegal scheme for SIP URI: %s" % uri.scheme) instance.scheme = uri.scheme instance.__dict__.update(cls._path_regex.match(uri.path).groupdict()) instance.params = {} if uri.params: params = (param.split('=', 1) for param in uri.params.split(';')) for param in params: if not param[0]: raise ValueError("illegal SIP URI parameter name: %s" % param[0]) if len(param) == 1: param.append(None) elif '=' in param[1]: raise ValueError("illegal SIP URI parameter value: %s" % param[1]) instance.params[param[0]] = param[1] if uri.query: try: instance.headers = dict(header.split('=') for header in uri.query.split('&')) except ValueError: raise ValueError("illegal SIP URI headers: %s" % uri.query) else: for name, value in instance.headers.iteritems(): if not name or not value: raise ValueError("illegal URI header: %s=%s" % (name, value)) else: instance.headers = {} return instance class XCAPURI(AnyURI): _path_regex = re.compile(r'^(?P/(([^/]+)/)*)?(?P[^/]+)/((?Pglobal)|(users/(?P[^/]+)))/(?P~?(([^~]+~)|([^~]+))*)(/~~(?P.*))?$') def __new__(cls, value): instance = AnyURI.__new__(cls, value) uri = urlparse.urlparse(instance) if uri.scheme not in ('http', 'https', ''): raise ValueError("illegal scheme for XCAP URI: %s" % uri.scheme) instance.scheme = uri.scheme instance.username = uri.username instance.password = uri.password instance.hostname = uri.hostname instance.port = uri.port instance.__dict__.update(cls._path_regex.match(uri.path).groupdict()) instance.globaltree = instance.globaltree is not None if uri.query: try: instance.query = dict(header.split('=') for header in uri.query.split('&')) except ValueError: raise ValueError("illegal XCAP URI query string: %s" % uri.query) else: for name, value in instance.query.iteritems(): if not name or not value: raise ValueError("illegal XCAP URI query parameter: %s=%s" % (name, value)) else: instance.query = {} return instance relative = property(lambda self: self.scheme == '') +class ID(str): + _id_regex = re.compile(r'^[a-z_][a-z0-9_\.\-]*', re.I) + + def __new__(cls, value): + if not cls._id_regex.match(value): + raise ValueError("illegal ID value: %s" % value) + return str.__new__(cls, value) + + diff --git a/sipsimple/payloads/pidf.py b/sipsimple/payloads/pidf.py index fcce3d2b..fd71fc49 100644 --- a/sipsimple/payloads/pidf.py +++ b/sipsimple/payloads/pidf.py @@ -1,467 +1,467 @@ # Copyright (C) 2008-2011 AG Projects. See LICENSE for details. # """PIDF handling according to RFC3863 and RFC4479""" __all__ = ['pidf_namespace', 'dm_namespace', 'PIDFDocument', 'ServiceExtension', 'DeviceExtension', 'PersonExtension', 'StatusExtension', 'Note', 'DeviceID', 'Status', 'Basic', 'Contact', 'ServiceTimestamp', 'Service', 'DeviceTimestamp', 'Device', 'PersonTimestamp', 'Person', 'PIDF', # Extensions 'ExtendedStatus'] import weakref from sipsimple.payloads import ValidationError, XMLDocument, XMLListRootElement, XMLElement, XMLAttribute, XMLElementID, XMLElementChild from sipsimple.payloads import XMLStringElement, XMLLocalizedStringElement, XMLDateTimeElement, XMLAnyURIElement -from sipsimple.payloads.datatypes import AnyURI +from sipsimple.payloads.datatypes import AnyURI, ID pidf_namespace = 'urn:ietf:params:xml:ns:pidf' dm_namespace = 'urn:ietf:params:xml:ns:pidf:data-model' class PIDFDocument(XMLDocument): content_type = 'application/pidf+xml' PIDFDocument.register_namespace(pidf_namespace, prefix=None, schema='pidf.xsd') PIDFDocument.register_namespace(dm_namespace, prefix='dm', schema='data-model.xsd') ## Marker mixin class ServiceExtension(object): pass class DeviceExtension(object): pass class PersonExtension(object): pass class StatusExtension(object): pass ## Attribute value types class BasicStatusValue(str): def __new__(cls, value): if value not in ('closed', 'open'): raise ValueError('illegal BasicStatusValue') return str.__new__(cls, value) ## General elements class Note(unicode): def __new__(cls, value, lang=None): instance = unicode.__new__(cls, value) instance.lang = lang return instance def __repr__(self): return "%s(%s, lang=%r)" % (self.__class__.__name__, unicode.__repr__(self), self.lang) def __eq__(self, other): if isinstance(other, Note): return unicode.__eq__(self, other) and self.lang == other.lang elif isinstance(other, basestring): return self.lang is None and unicode.__eq__(self, other) else: return NotImplemented def __ne__(self, other): equal = self.__eq__(other) return NotImplemented if equal is NotImplemented else not equal class PIDFNote(XMLLocalizedStringElement): _xml_tag = 'note' _xml_namespace = pidf_namespace _xml_document = PIDFDocument def __unicode__(self): return Note(self.value, self.lang) class DMNote(XMLLocalizedStringElement): _xml_tag = 'note' _xml_namespace = dm_namespace _xml_document = PIDFDocument def __unicode__(self): return Note(self.value, self.lang) class NoteMap(object): """Descriptor to be used for _note_map attributes on XML elements with notes""" def __init__(self): self.object_map = weakref.WeakKeyDictionary() def __get__(self, obj, type): if obj is None: return self return self.object_map.setdefault(obj, {}) def __set__(self, obj, value): raise AttributeError("cannot set attribute") def __delete__(self, obj): raise AttributeError("cannot delete attribute") class NoteList(object): def __init__(self, xml_element, note_type): self.xml_element = xml_element self.note_type = note_type def __contains__(self, item): if isinstance(item, Note): item = self.note_type(item, item.lang) elif isinstance(item, basestring): item = self.note_type(item) return item in self.xml_element._note_map.itervalues() def __iter__(self): return (unicode(self.xml_element._note_map[element]) for element in self.xml_element.element if element in self.xml_element._note_map) def __len__(self): return len(self.xml_element._note_map) def __nonzero__(self): return bool(self.xml_element._note_map) def _parse_element(self, element): self.xml_element._note_map.clear() for child in element: if child.tag == self.note_type.qname: try: note = self.note_type.from_element(child, xml_document=self.xml_element._xml_document) except ValidationError: pass else: self.xml_element._note_map[note.element] = note def _build_element(self): for note in self.xml_element._note_map.itervalues(): note.to_element() def add(self, item): if isinstance(item, Note): item = self.note_type(item, item.lang) elif isinstance(item, basestring): item = self.note_type(item) if type(item) is not self.note_type: raise TypeError("%s cannot add notes of type %s" % (self.xml_element.__class__.__name__, item.__class__.__name__)) self.xml_element._insert_element(item.element) self.xml_element._note_map[item.element] = item self.xml_element.__dirty__ = True def remove(self, item): if isinstance(item, Note): try: item = (entry for entry in self.xml_element._note_map.itervalues() if unicode(entry) == item).next() except StopIteration: raise KeyError(item) elif isinstance(item, basestring): try: item = (entry for entry in self.xml_element._note_map.itervalues() if entry == item).next() except StopIteration: raise KeyError(item) if type(item) is not self.note_type: raise KeyError(item) self.xml_element.element.remove(item.element) del self.xml_element._note_map[item.element] self.xml_element.__dirty__ = True def update(self, sequence): for item in sequence: self.add(item) def clear(self): for item in self.xml_element._note_map.values(): self.remove(item) class DeviceID(XMLStringElement): _xml_tag = 'deviceID' _xml_namespace = dm_namespace _xml_document = PIDFDocument ## Service elements class Basic(XMLStringElement): _xml_tag = 'basic' _xml_namespace = pidf_namespace _xml_document = PIDFDocument _xml_value_type = BasicStatusValue class Status(XMLElement): _xml_tag = 'status' _xml_namespace = pidf_namespace _xml_document = PIDFDocument _xml_extension_type = StatusExtension _xml_children_order = {Basic.qname: 0} basic = XMLElementChild('basic', type=Basic, required=False, test_equal=True) def __init__(self, basic=None): XMLElement.__init__(self) self.basic = basic def check_validity(self): if len(self.element) == 0: raise ValidationError("Status objects must have at least one child") super(Status, self).check_validity() def __repr__(self): return '%s(%r)' % (self.__class__.__name__, self.basic) class Contact(XMLAnyURIElement): _xml_tag = 'contact' _xml_namespace = pidf_namespace _xml_document = PIDFDocument priority = XMLAttribute('priority', type=float, required=False, test_equal=False) class ServiceTimestamp(XMLDateTimeElement): _xml_tag = 'timestamp' _xml_namespace = pidf_namespace _xml_document = PIDFDocument class Service(XMLElement): _xml_tag = 'tuple' _xml_namespace = pidf_namespace _xml_document = PIDFDocument _xml_extension_type = ServiceExtension _xml_children_order = {Status.qname: 0, None: 1, Contact.qname: 2, PIDFNote.qname: 3, ServiceTimestamp.qname: 4} - id = XMLElementID('id', type=str, required=True, test_equal=True) + id = XMLElementID('id', type=ID, required=True, test_equal=True) status = XMLElementChild('status', type=Status, required=True, test_equal=True) contact = XMLElementChild('contact', type=Contact, required=False, test_equal=True) timestamp = XMLElementChild('timestamp', type=ServiceTimestamp, required=False, test_equal=True) device_id = XMLElementChild('device_id', type=DeviceID, required=False, test_equal=True) _note_map = NoteMap() def __init__(self, id, notes=[], status=None, contact=None, timestamp=None, device_id=None): XMLElement.__init__(self) self.id = id self.status = status self.contact = contact self.timestamp = timestamp self.device_id = device_id self.notes.update(notes) @property def notes(self): return NoteList(self, PIDFNote) def __repr__(self): return '%s(%r, %r, %r, %r, %r, %r)' % (self.__class__.__name__, self.id, list(self.notes), self.status, self.contact, self.timestamp, self.device_id) def _parse_element(self, element): super(Service, self)._parse_element(element) self.notes._parse_element(element) def _build_element(self): super(Service, self)._build_element() self.notes._build_element() class DeviceTimestamp(XMLDateTimeElement): _xml_tag = 'timestamp' _xml_namespace = dm_namespace _xml_document = PIDFDocument class Device(XMLElement): _xml_tag = 'device' _xml_namespace = dm_namespace _xml_document = PIDFDocument _xml_extension_type = DeviceExtension _xml_children_order = {None: 0, DeviceID.qname: 1, DMNote.qname: 2, DeviceTimestamp.qname: 3} - id = XMLElementID('id', type=str, required=True, test_equal=True) + id = XMLElementID('id', type=ID, required=True, test_equal=True) device_id = XMLElementChild('device_id', type=DeviceID, required=False, test_equal=True) timestamp = XMLElementChild('timestamp', type=DeviceTimestamp, required=False, test_equal=True) _note_map = NoteMap() def __init__(self, id, device_id=None, notes=[], timestamp=None): XMLElement.__init__(self) self.id = id self.device_id = device_id self.timestamp = timestamp self.notes.update(notes) @property def notes(self): return NoteList(self, DMNote) def __repr__(self): return '%s(%r, %r, %r, %r)' % (self.__class__.__name__, self.id, self.device_id, list(self.notes), self.timestamp) def _parse_element(self, element): super(Device, self)._parse_element(element) self.notes._parse_element(element) def _build_element(self): super(Device, self)._build_element() self.notes._build_element() class PersonTimestamp(XMLDateTimeElement): _xml_tag = 'timestamp' _xml_namespace = dm_namespace _xml_document = PIDFDocument class Person(XMLElement): _xml_tag = 'person' _xml_namespace = dm_namespace _xml_document = PIDFDocument _xml_extension_type = PersonExtension _xml_children_order = {None: 0, DMNote.qname: 1, PersonTimestamp.qname: 2} - id = XMLElementID('id', type=str, required=True, test_equal=True) + id = XMLElementID('id', type=ID, required=True, test_equal=True) timestamp = XMLElementChild('timestamp', type=PersonTimestamp, required=False, test_equal=True) _note_map = NoteMap() def __init__(self, id, notes=[], timestamp=None): XMLElement.__init__(self) self.id = id self.timestamp = timestamp self.notes.update(notes) @property def notes(self): return NoteList(self, DMNote) def __repr__(self): return '%s(%r, %r, %r)' % (self.__class__.__name__, self.id, list(self.notes), self.timestamp) def _parse_element(self, element): super(Person, self)._parse_element(element) self.notes._parse_element(element) def _build_element(self): super(Person, self)._build_element() self.notes._build_element() class PIDF(XMLListRootElement): _xml_tag = 'presence' _xml_namespace = pidf_namespace _xml_document = PIDFDocument _xml_children_order = {Service.qname: 0, PIDFNote.qname: 1, Person.qname: 2, Device.qname: 3} _xml_item_type = (Service, PIDFNote, Person, Device) entity = XMLAttribute('entity', type=AnyURI, required=True, test_equal=True) services = property(lambda self: (item for item in self if type(item) is Service)) notes = property(lambda self: (item for item in self if type(item) is Note)) persons = property(lambda self: (item for item in self if type(item) is Person)) devices = property(lambda self: (item for item in self if type(item) is Device)) def __init__(self, entity, elements=[]): XMLListRootElement.__init__(self) self.entity = entity self.update(elements) def __contains__(self, item): if isinstance(item, Note): item = PIDFNote(item, item.lang) return super(PIDF, self).__contains__(item) def __iter__(self): return (unicode(item) if type(item) is PIDFNote else item for item in super(PIDF, self).__iter__()) def __repr__(self): return '%s(%r, %r)' % (self.__class__.__name__, self.entity, list(self)) def add(self, item): if isinstance(item, Note): item = PIDFNote(item, item.lang) super(PIDF, self).add(item) def remove(self, item): if isinstance(item, Note): try: item = (entry for entry in super(PIDF, self).__iter__() if type(entry) is PIDFNote and unicode(entry) == item).next() except StopIteration: raise KeyError(item) super(PIDF, self).remove(item) # # Extensions # agp_pidf_namespace = 'urn:ag-projects:xml:ns:pidf' PIDFDocument.register_namespace(agp_pidf_namespace, prefix='agp-pidf') class ExtendedStatusValue(str): def __new__(cls, value): if value not in ('available', 'offline', 'away', 'extended-away', 'busy'): raise ValueError("illegal value for extended status") return str.__new__(cls, value) class ExtendedStatus(XMLStringElement, StatusExtension): _xml_tag = 'extended' _xml_namespace = agp_pidf_namespace _xml_document = PIDFDocument _xml_value_type = ExtendedStatusValue Status.register_extension('extended', type=ExtendedStatus) class DeviceInfo(XMLElement, ServiceExtension): _xml_tag = 'device-info' _xml_namespace = agp_pidf_namespace _xml_document = PIDFDocument - id = XMLElementID('id', type=str, required=True, test_equal=True) + id = XMLElementID('id', type=ID, required=True, test_equal=True) description = XMLAttribute('description', type=str, required=False, test_equal=True) def __init__(self, id, description=None): XMLElement.__init__(self) self.id = id self.description = description def __repr__(self): return '%s(%r, %r)' % (self.__class__.__name__, self.id, self.description) Service.register_extension('device_info', type=DeviceInfo) diff --git a/sipsimple/payloads/policy.py b/sipsimple/payloads/policy.py index f3ddc240..2fc1a3e9 100644 --- a/sipsimple/payloads/policy.py +++ b/sipsimple/payloads/policy.py @@ -1,353 +1,354 @@ # Copyright (C) 2008-2011 AG Projects. See LICENSE for details. # """ Generic data types to be used in policy applications, according to RFC4745. """ __all__ = ['namespace', 'CommonPolicyDocument', 'ConditionElement', 'ActionElement', 'TransformationElement', 'RuleExtension', 'IdentityOne', 'IdentityExcept', 'IdentityMany', 'Identity', 'Validity', 'Conditions', 'Actions', 'Transformations', 'Rule', 'RuleSet', # Extensions 'FalseCondition', 'RuleDisplayName'] from sipsimple.payloads import ValidationError, XMLDocument, XMLElement, XMLListElement, XMLListRootElement, XMLAttribute, XMLElementID, XMLElementChild, XMLLocalizedStringElement, XMLDateTimeElement +from sipsimple.payloads.datatypes import AnyURI, ID namespace = 'urn:ietf:params:xml:ns:common-policy' class CommonPolicyDocument(XMLDocument): content_type = 'application/auth-policy+xml' CommonPolicyDocument.register_namespace(namespace, prefix='cp', schema='common-policy.xsd') ## Mixin types for extensibility class ConditionElement(object): pass class ActionElement(object): pass class TransformationElement(object): pass class RuleExtension(object): pass ## Elements class IdentityOne(XMLElement): _xml_tag = 'one' _xml_namespace = namespace _xml_document = CommonPolicyDocument id = XMLElementID('id', type=str, required=True, test_equal=True) def __init__(self, id): XMLElement.__init__(self) self.id = id def __repr__(self): return '%s(%r)' % (self.__class__.__name__, self.id) def __str__(self): return self.id def matches(self, uri): return self.id == uri class IdentityExcept(XMLElement): _xml_tag = 'except' _xml_namespace = namespace _xml_document = CommonPolicyDocument def _onset_id(self, attribute, value): if value is not None: self.domain = None id = XMLAttribute('id', type=str, required=False, test_equal=True, onset=_onset_id) del _onset_id def _onset_domain(self, attribute, value): if value is not None: self.id = None domain = XMLAttribute('domain', type=str, required=False, test_equal=True, onset=_onset_domain) del _onset_domain def __init__(self, id=None, domain=None): XMLElement.__init__(self) self.id = id self.domain = domain def __repr__(self): return '%s(%r, %r)' % (self.__class__.__name__, self.id, self.domain) def __str__(self): if self.id is not None: return self.id else: return self.domain def matches(self, uri): if self.id is not None: return self.id != uri else: return [self.domain] != uri.split('@', 1)[1:] class IdentityMany(XMLListElement): _xml_tag = 'many' _xml_namespace = namespace _xml_document = CommonPolicyDocument _xml_children_order = {IdentityExcept.qname: 0} _xml_item_type = IdentityExcept domain = XMLAttribute('domain', type=str, required=False, test_equal=True) def __init__(self, domain=None, exceptions=[]): XMLListElement.__init__(self) self.domain = domain self.update(exceptions) def __repr__(self): return '%s(%r, %s)' % (self.__class__.__name__, self.domain, list(self)) def matches(self, uri): if self.domain is not None: if self.domain != uri.partition('@')[2]: return False for child in self: if not child.matches(uri): return False return True class Identity(XMLListElement): _xml_tag = 'identity' _xml_namespace = namespace _xml_document = CommonPolicyDocument _xml_item_type = (IdentityOne, IdentityMany) def __init__(self, identities=[]): XMLListElement.__init__(self) self.update(identities) def matches(self, uri): for child in self: if child.matches(uri): return True return False class Sphere(XMLElement): _xml_tag = 'sphere' _xml_namespace = namespace _xml_document = CommonPolicyDocument value = XMLAttribute('value', type=str, required=True, test_equal=True) def __init__(self, value): XMLElement.__init__(self) self.value = value def __repr__(self): return '%s(%r)' % (self.__class__.__name__, self.value) class ValidFrom(XMLDateTimeElement): _xml_tag = 'from' _xml_namespace = namespace _xml_document = CommonPolicyDocument class ValidUntil(XMLDateTimeElement): _xml_tag = 'until' _xml_namespace = namespace _xml_document = CommonPolicyDocument class ValidityInterval(object): def __init__(self, from_timestamp, until_timestamp): self.valid_from = ValidFrom(from_timestamp) self.valid_until = ValidUntil(until_timestamp) def __eq__(self, other): if isinstance(other, ValidityInterval): return self is other or (self.valid_from == other.valid_from and self.valid_until == other.valid_until) return NotImplemented def __ne__(self, other): if isinstance(other, ValidityInterval): return self is not other and (self.valid_from != other.valid_from or self.valid_until != other.valid_until) return NotImplemented @classmethod def from_elements(cls, from_element, until_element, xml_document=None): instance = object.__new__(cls) instance.valid_from = ValidFrom.from_element(from_element, xml_document) instance.valid_until = ValidUntil.from_element(until_element, xml_document) return instance class Validity(XMLListElement): _xml_tag = 'validity' _xml_namespace = namespace _xml_document = CommonPolicyDocument _xml_item_type = ValidityInterval def __init__(self, children=[]): XMLListElement.__init__(self) self.update(children) def _parse_element(self, element): iterator = iter(element) for first_child in iterator: second_child = iterator.next() if first_child.tag == '{%s}from' % self._xml_namespace and second_child.tag == '{%s}until' % self._xml_namespace: try: item = ValidityInterval.from_elements(first_child, second_child, xml_document=self._xml_document) except: pass else: self._element_map[item.valid_from.element] = item def _build_element(self): for child in self: child.valid_from.to_element() child.valid_until.to_element() def add(self, item): if not isinstance(item, ValidityInterval): raise TypeError("Validity element must be a ValidityInterval instance") self._insert_element(item.valid_from.element) self._insert_element(item.valid_until.element) self._element_map[item.valid_from.element] = item self.__dirty__ = True def remove(self, item): self.element.remove(item.valid_from.element) self.element.remove(item.valid_until.element) del self._element_map[item.valid_from.element] self.__dirty__ = True def check_validity(self): if not self: raise ValidationError("cannot have Validity element without any children") super(Validity, self).check_validity(self) class Conditions(XMLListElement): _xml_tag = 'conditions' _xml_namespace = namespace _xml_document = CommonPolicyDocument _xml_children_order = {Identity.qname: 0, Sphere.qname: 1, Validity.qname: 2} _xml_item_type = (Identity, Sphere, Validity, ConditionElement) def __init__(self, conditions=[]): XMLListElement.__init__(self) self.update(conditions) class Actions(XMLListElement): _xml_tag = 'actions' _xml_namespace = namespace _xml_document = CommonPolicyDocument _xml_item_type = ActionElement def __init__(self, actions=[]): XMLListElement.__init__(self) self.update(actions) class Transformations(XMLListElement): _xml_tag = 'transformations' _xml_namespace = namespace _xml_document = CommonPolicyDocument _xml_item_type = TransformationElement def __init__(self, transformations=[]): XMLListElement.__init__(self) self.update(transformations) class Rule(XMLElement): _xml_tag = 'rule' _xml_namespace = namespace _xml_extension_type = RuleExtension _xml_document = CommonPolicyDocument _xml_children_order = {Conditions.qname: 0, Actions.qname: 1, Transformations.qname: 2} - id = XMLElementID('id', type=unicode, required=True, test_equal=True) + id = XMLElementID('id', type=ID, required=True, test_equal=True) conditions = XMLElementChild('conditions', type=Conditions, required=False, test_equal=True) actions = XMLElementChild('actions', type=Actions, required=False, test_equal=True) transformations = XMLElementChild('transformations', type=Transformations, required=False, test_equal=True) def __init__(self, id, conditions=None, actions=None, transformations=None): XMLElement.__init__(self) self.id = id self.conditions = conditions self.actions = actions self.transformations = transformations def __repr__(self): return '%s(%r, %r, %r, %r)' % (self.__class__.__name__, self.id, self.conditions, self.actions, self.transformations) class RuleSet(XMLListRootElement): _xml_tag = 'ruleset' _xml_namespace = namespace _xml_document = CommonPolicyDocument _xml_item_type = Rule def __init__(self, rules=[]): XMLListRootElement.__init__(self) self.update(rules) def __getitem__(self, key): return self._xmlid_map[Rule][key] def __delitem__(self, key): self.remove(self._xmlid_map[Rule][key]) # # Extensions # agp_cp_namespace = 'urn:ag-projects:xml:ns:common-policy' CommonPolicyDocument.register_namespace(agp_cp_namespace, prefix='agp-cp') # A condition element in the AG Projects namespace, it will always be evaluated to false # because it's not understood by servers class FalseCondition(XMLElement, ConditionElement): _xml_tag = 'false-condition' _xml_namespace = agp_cp_namespace _xml_document = CommonPolicyDocument class RuleDisplayName(XMLLocalizedStringElement, RuleExtension): _xml_tag = 'display-name' _xml_namespace = agp_cp_namespace _xml_document = CommonPolicyDocument Rule.register_extension('display_name', RuleDisplayName)