Page MenuHomePhabricator

No OneTemporary

diff --git a/sipsimple/payloads/addressbook.py b/sipsimple/payloads/addressbook.py
index 9489c01f..477d9174 100644
--- a/sipsimple/payloads/addressbook.py
+++ b/sipsimple/payloads/addressbook.py
@@ -1,279 +1,279 @@
# Copyright (C) 2012 AG Projects. See LICENSE for details.
#
"""Addressbook related payload elements"""
__all__ = ['namespace', 'Group', 'Contact', 'ContactURI', 'ContactURIList', 'ElementExtension', 'ElementAttributes']
from lxml import etree
from sipsimple.payloads import XMLElement, XMLListElement, XMLStringElement, XMLBooleanElement, XMLElementID, XMLAttribute, XMLElementChild
-from sipsimple.payloads.datatypes import AnyURI
+from sipsimple.payloads.datatypes import AnyURI, ID
from sipsimple.payloads.resourcelists import ResourceListsDocument, ListElement
namespace = 'urn:ag-projects:xml:ns:addressbook'
ResourceListsDocument.register_namespace(namespace, prefix='addressbook', schema='addressbook.xsd')
class ElementExtension(object): pass
class Name(XMLStringElement):
_xml_tag = 'name'
_xml_namespace = namespace
_xml_document = ResourceListsDocument
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self.value)
class Group(XMLElement, ListElement):
_xml_tag = 'group'
_xml_namespace = namespace
_xml_extension_type = ElementExtension
_xml_document = ResourceListsDocument
- id = XMLElementID('id', type=str, required=True, test_equal=True)
+ id = XMLElementID('id', type=ID, required=True, test_equal=True)
name = XMLElementChild('name', type=Name, required=True, test_equal=True)
def __init__(self, id, name):
XMLElement.__init__(self)
self.id = id
self.name = name
def __unicode__(self):
return unicode(self.name)
def __repr__(self):
return '%s(%r, %r)' % (self.__class__.__name__, self.id, self.name)
class ContactURI(XMLElement):
_xml_tag = 'uri'
_xml_namespace = namespace
_xml_extension_type = ElementExtension
_xml_document = ResourceListsDocument
- id = XMLElementID('id', type=str, required=True, test_equal=True)
+ id = XMLElementID('id', type=ID, required=True, test_equal=True)
uri = XMLAttribute('uri', type=AnyURI, required=True, test_equal=True)
type = XMLAttribute('type', type=unicode, required=False, test_equal=True)
def __init__(self, id, uri, type=None):
XMLElement.__init__(self)
self.id = id
self.uri = uri
self.type = type
def __unicode__(self):
return unicode(self.uri)
def __repr__(self):
return '%s(%r, %r, %r)' % (self.__class__.__name__, self.id, self.uri, self.type)
class ContactURIList(XMLListElement):
_xml_tag = 'uris'
_xml_namespace = namespace
_xml_document = ResourceListsDocument
_xml_item_type = ContactURI
def __init__(self, uris=[]):
XMLListElement.__init__(self)
self.update(uris)
class PolicyValue(str):
def __new__(cls, value):
if value not in ('allow', 'block', 'ignore', 'confirm'):
raise ValueError("Invalid policy value: %s" % value)
return super(PolicyValue, cls).__new__(cls, value)
class Policy(XMLStringElement):
_xml_tag = 'policy'
_xml_namespace = namespace
_xml_document = ResourceListsDocument
_xml_value_type = PolicyValue
class Subscribe(XMLBooleanElement):
_xml_tag = 'subscribe'
_xml_namespace = namespace
_xml_document = ResourceListsDocument
class DialogHandling(XMLElement):
_xml_tag = 'dialog'
_xml_namespace = namespace
_xml_document = ResourceListsDocument
policy = XMLElementChild('policy', type=Policy, required=True, test_equal=True)
subscribe = XMLElementChild('subscribe', type=Subscribe, required=True, test_equal=True)
def __init__(self, policy, subscribe):
XMLElement.__init__(self)
self.policy = policy
self.subscribe = subscribe
def __repr__(self):
return '%s(%r, %r)' % (self.__class__.__name__, self.policy, self.subscribe)
class PresenceHandling(XMLElement):
_xml_tag = 'presence'
_xml_namespace = namespace
_xml_document = ResourceListsDocument
policy = XMLElementChild('policy', type=Policy, required=True, test_equal=True)
subscribe = XMLElementChild('subscribe', type=Subscribe, required=True, test_equal=True)
def __init__(self, policy, subscribe):
XMLElement.__init__(self)
self.policy = policy
self.subscribe = subscribe
def __repr__(self):
return '%s(%r, %r)' % (self.__class__.__name__, self.policy, self.subscribe)
class Contact(XMLElement, ListElement):
_xml_tag = 'contact'
_xml_namespace = namespace
_xml_extension_type = ElementExtension
_xml_document = ResourceListsDocument
- id = XMLElementID('id', type=str, required=True, test_equal=True)
+ id = XMLElementID('id', type=ID, required=True, test_equal=True)
group_id = XMLAttribute('group_id', type=str, required=True, test_equal=True)
name = XMLElementChild('name', type=Name, required=True, test_equal=True)
uris = XMLElementChild('uris', type=ContactURIList, required=True, test_equal=True)
dialog = XMLElementChild('dialog', type=DialogHandling, required=True, test_equal=True)
presence = XMLElementChild('presence', type=PresenceHandling, required=True, test_equal=True)
def __init__(self, id, group_id, name, uris=[], presence_handling=None, dialog_handling=None):
XMLElement.__init__(self)
self.id = id
self.group_id = group_id
self.name = name
self.uris = ContactURIList(uris)
self.dialog = dialog_handling or DialogHandling('confirm', False)
self.presence = presence_handling or PresenceHandling('confirm', False)
def __repr__(self):
return '%s(%r, %r, %r, %r, %r, %r)' % (self.__class__.__name__, self.id, self.group_id, self.name, list(self.uris), self.presence, self.dialog)
#
# Extensions
#
class ElementAttributes(XMLElement, ElementExtension):
_xml_tag = 'attributes'
_xml_namespace = 'urn:ag-projects:sipsimple:xml:ns:addressbook'
_xml_document = ResourceListsDocument
def __init__(self, attributes={}):
XMLElement.__init__(self)
self._attributes = dict()
self.update(attributes)
def _parse_element(self, element):
self._attributes = dict()
attribute_tag = '{%s}attribute' % self._xml_namespace
for child in (child for child in element if child.tag == attribute_tag):
if 'nil' in child.attrib:
self[child.attrib['name']] = None
else:
self[child.attrib['name']] = unicode(child.text or u'')
def _build_element(self):
self.element.clear()
attribute_tag = '{%s}attribute' % self._xml_namespace
for key, value in self.iteritems():
child = etree.SubElement(self.element, attribute_tag, nsmap=self._xml_document.nsmap)
child.attrib['name'] = key
if value is None:
child.attrib['nil'] = 'true'
else:
child.text = value
def __contains__(self, key):
return key in self._attributes
def __iter__(self):
return iter(self._attributes)
def __getitem__(self, key):
return self._attributes[key]
def __setitem__(self, key, value):
if self._attributes.get(key, None) == value:
return
self._attributes[key] = value
self.__dirty__ = True
def __delitem__(self, key):
del self._attributes[key]
self.__dirty__ = True
def __repr__(self):
return "%s(%r)" % (self.__class__.__name__, dict(self))
def clear(self):
if self._attributes:
self._attributes.clear()
self.__dirty__ = True
def get(self, key, default=None):
return self._attributes.get(key, default)
def has_key(self, key):
return key in self._attributes
def items(self):
return self._attributes.items()
def iteritems(self):
return self._attributes.iteritems()
def iterkeys(self):
return self._attributes.iterkeys()
def itervalues(self):
return self._attributes.itervalues()
def keys(self):
return self._attributes.keys()
def pop(self, key, *args):
value = self._attributes.pop(key, *args)
if not args or value is not args[0]:
self.__dirty__ = True
return value
def popitem(self):
value = self._attributes.popitem()
self.__dirty__ = True
return value
def setdefault(self, key, default=None):
value = self._attributes.setdefault(key, default)
if value is default:
self.__dirty__ = True
return value
def update(self, attributes=(), **kw):
self._attributes.update(attributes, **kw)
if attributes or kw:
self.__dirty__ = True
ResourceListsDocument.register_namespace(ElementAttributes._xml_namespace, prefix='sipsimple')
Group.register_extension('attributes', ElementAttributes)
Contact.register_extension('attributes', ElementAttributes)
ContactURI.register_extension('attributes', ElementAttributes)
diff --git a/sipsimple/payloads/datatypes.py b/sipsimple/payloads/datatypes.py
index 189da414..57854917 100644
--- a/sipsimple/payloads/datatypes.py
+++ b/sipsimple/payloads/datatypes.py
@@ -1,229 +1,238 @@
# Copyright (C) 2008-2011 AG Projects. See LICENSE for details.
#
"""Data types used for simple XML elements and for XML attributes"""
__all__ = ['Boolean', 'DateTime', 'Byte', 'UnsignedByte', 'Short', 'UnsignedShort', 'Int', 'UnsignedInt', 'Long', 'UnsignedLong',
'PositiveInteger', 'NegativeInteger', 'NonNegativeInteger', 'NonPositiveInteger', 'AnyURI', 'SIPURI', 'XCAPURI']
import re
import urllib
import urlparse
from sipsimple.util import Timestamp
class Boolean(int):
def __new__(cls, value):
return int.__new__(cls, bool(value))
def __repr__(self):
return 'True' if self else 'False'
__str__ = __repr__
@classmethod
def __xmlparse__(cls, value):
if value in ('True', 'true'):
return int.__new__(cls, 1)
elif value in ('False', 'false'):
return int.__new__(cls, 0)
else:
raise ValueError("Invalid boolean string representation: %s" % value)
def __xmlbuild__(self):
return u'true' if self else u'false'
class DateTime(Timestamp):
@classmethod
def __xmlparse__(cls, value):
return cls.parse(value)
def __xmlbuild__(self):
return self.format(self)
class Byte(int):
def __new__(cls, value):
instance = int.__new__(cls, value)
if not (-128 <= instance <= 127):
raise ValueError("integer number must be a signed 8bit value")
return instance
class UnsignedByte(int):
def __new__(cls, value):
instance = int.__new__(cls, value)
if not (0 <= instance <= 255):
raise ValueError("integer number must be an unsigned 8bit value")
return instance
class Short(int):
def __new__(cls, value):
instance = int.__new__(cls, value)
if not (-32768 <= instance <= 32767):
raise ValueError("integer number must be a signed 16bit value")
return instance
class UnsignedShort(int):
def __new__(cls, value):
instance = int.__new__(cls, value)
if not (0 <= instance <= 65535):
raise ValueError("integer number must be an unsigned 16bit value")
return instance
class Int(long):
def __new__(cls, value):
instance = long.__new__(cls, value)
if not (-2147483648 <= instance <= 2147483647):
raise ValueError("integer number must be a signed 32bit value")
return instance
class UnsignedInt(long):
def __new__(cls, value):
instance = long.__new__(cls, value)
if not (0 <= instance <= 4294967295):
raise ValueError("integer number must be an unsigned 32bit value")
return instance
class Long(long):
def __new__(cls, value):
instance = long.__new__(cls, value)
if not (-9223372036854775808 <= instance <= 9223372036854775807):
raise ValueError("integer number must be a signed 64bit value")
return instance
class UnsignedLong(long):
def __new__(cls, value):
instance = long.__new__(cls, value)
if not (0 <= instance <= 18446744073709551615):
raise ValueError("integer number must be an unsigned 64bit value")
return instance
class PositiveInteger(long):
def __new__(cls, value):
instance = long.__new__(cls, value)
if instance <= 0:
raise ValueError("integer number must be a positive value")
return instance
class NegativeInteger(long):
def __new__(cls, value):
instance = long.__new__(cls, value)
if instance >= 0:
raise ValueError("integer number must be a negative value")
return instance
class NonNegativeInteger(long):
def __new__(cls, value):
instance = long.__new__(cls, value)
if instance < 0:
raise ValueError("integer number must be a non-negative value")
return instance
class NonPositiveInteger(long):
def __new__(cls, value):
instance = long.__new__(cls, value)
if instance > 0:
raise ValueError("integer number must be a non-positive value")
return instance
class AnyURI(unicode):
@classmethod
def __xmlparse__(cls, value):
return cls.__new__(cls, urllib.unquote(value).decode('utf-8'))
def __xmlbuild__(self):
return urllib.quote(self.encode('utf-8'))
class SIPURI(AnyURI):
_path_regex = re.compile(r'^((?P<username>[^:@]+)(:(?P<password>[^@]+))?@)?(?P<domain>.*)$')
def __new__(cls, value):
instance = AnyURI.__new__(cls, value)
uri = urlparse.urlparse(instance)
if uri.scheme not in ('sip', 'sips'):
raise ValueError("illegal scheme for SIP URI: %s" % uri.scheme)
instance.scheme = uri.scheme
instance.__dict__.update(cls._path_regex.match(uri.path).groupdict())
instance.params = {}
if uri.params:
params = (param.split('=', 1) for param in uri.params.split(';'))
for param in params:
if not param[0]:
raise ValueError("illegal SIP URI parameter name: %s" % param[0])
if len(param) == 1:
param.append(None)
elif '=' in param[1]:
raise ValueError("illegal SIP URI parameter value: %s" % param[1])
instance.params[param[0]] = param[1]
if uri.query:
try:
instance.headers = dict(header.split('=') for header in uri.query.split('&'))
except ValueError:
raise ValueError("illegal SIP URI headers: %s" % uri.query)
else:
for name, value in instance.headers.iteritems():
if not name or not value:
raise ValueError("illegal URI header: %s=%s" % (name, value))
else:
instance.headers = {}
return instance
class XCAPURI(AnyURI):
_path_regex = re.compile(r'^(?P<root>/(([^/]+)/)*)?(?P<auid>[^/]+)/((?P<globaltree>global)|(users/(?P<userstree>[^/]+)))/(?P<document>~?(([^~]+~)|([^~]+))*)(/~~(?P<node>.*))?$')
def __new__(cls, value):
instance = AnyURI.__new__(cls, value)
uri = urlparse.urlparse(instance)
if uri.scheme not in ('http', 'https', ''):
raise ValueError("illegal scheme for XCAP URI: %s" % uri.scheme)
instance.scheme = uri.scheme
instance.username = uri.username
instance.password = uri.password
instance.hostname = uri.hostname
instance.port = uri.port
instance.__dict__.update(cls._path_regex.match(uri.path).groupdict())
instance.globaltree = instance.globaltree is not None
if uri.query:
try:
instance.query = dict(header.split('=') for header in uri.query.split('&'))
except ValueError:
raise ValueError("illegal XCAP URI query string: %s" % uri.query)
else:
for name, value in instance.query.iteritems():
if not name or not value:
raise ValueError("illegal XCAP URI query parameter: %s=%s" % (name, value))
else:
instance.query = {}
return instance
relative = property(lambda self: self.scheme == '')
+class ID(str):
+ _id_regex = re.compile(r'^[a-z_][a-z0-9_\.\-]*', re.I)
+
+ def __new__(cls, value):
+ if not cls._id_regex.match(value):
+ raise ValueError("illegal ID value: %s" % value)
+ return str.__new__(cls, value)
+
+
diff --git a/sipsimple/payloads/pidf.py b/sipsimple/payloads/pidf.py
index fcce3d2b..fd71fc49 100644
--- a/sipsimple/payloads/pidf.py
+++ b/sipsimple/payloads/pidf.py
@@ -1,467 +1,467 @@
# Copyright (C) 2008-2011 AG Projects. See LICENSE for details.
#
"""PIDF handling according to RFC3863 and RFC4479"""
__all__ = ['pidf_namespace',
'dm_namespace',
'PIDFDocument',
'ServiceExtension',
'DeviceExtension',
'PersonExtension',
'StatusExtension',
'Note',
'DeviceID',
'Status',
'Basic',
'Contact',
'ServiceTimestamp',
'Service',
'DeviceTimestamp',
'Device',
'PersonTimestamp',
'Person',
'PIDF',
# Extensions
'ExtendedStatus']
import weakref
from sipsimple.payloads import ValidationError, XMLDocument, XMLListRootElement, XMLElement, XMLAttribute, XMLElementID, XMLElementChild
from sipsimple.payloads import XMLStringElement, XMLLocalizedStringElement, XMLDateTimeElement, XMLAnyURIElement
-from sipsimple.payloads.datatypes import AnyURI
+from sipsimple.payloads.datatypes import AnyURI, ID
pidf_namespace = 'urn:ietf:params:xml:ns:pidf'
dm_namespace = 'urn:ietf:params:xml:ns:pidf:data-model'
class PIDFDocument(XMLDocument):
content_type = 'application/pidf+xml'
PIDFDocument.register_namespace(pidf_namespace, prefix=None, schema='pidf.xsd')
PIDFDocument.register_namespace(dm_namespace, prefix='dm', schema='data-model.xsd')
## Marker mixin
class ServiceExtension(object): pass
class DeviceExtension(object): pass
class PersonExtension(object): pass
class StatusExtension(object): pass
## Attribute value types
class BasicStatusValue(str):
def __new__(cls, value):
if value not in ('closed', 'open'):
raise ValueError('illegal BasicStatusValue')
return str.__new__(cls, value)
## General elements
class Note(unicode):
def __new__(cls, value, lang=None):
instance = unicode.__new__(cls, value)
instance.lang = lang
return instance
def __repr__(self):
return "%s(%s, lang=%r)" % (self.__class__.__name__, unicode.__repr__(self), self.lang)
def __eq__(self, other):
if isinstance(other, Note):
return unicode.__eq__(self, other) and self.lang == other.lang
elif isinstance(other, basestring):
return self.lang is None and unicode.__eq__(self, other)
else:
return NotImplemented
def __ne__(self, other):
equal = self.__eq__(other)
return NotImplemented if equal is NotImplemented else not equal
class PIDFNote(XMLLocalizedStringElement):
_xml_tag = 'note'
_xml_namespace = pidf_namespace
_xml_document = PIDFDocument
def __unicode__(self):
return Note(self.value, self.lang)
class DMNote(XMLLocalizedStringElement):
_xml_tag = 'note'
_xml_namespace = dm_namespace
_xml_document = PIDFDocument
def __unicode__(self):
return Note(self.value, self.lang)
class NoteMap(object):
"""Descriptor to be used for _note_map attributes on XML elements with notes"""
def __init__(self):
self.object_map = weakref.WeakKeyDictionary()
def __get__(self, obj, type):
if obj is None:
return self
return self.object_map.setdefault(obj, {})
def __set__(self, obj, value):
raise AttributeError("cannot set attribute")
def __delete__(self, obj):
raise AttributeError("cannot delete attribute")
class NoteList(object):
def __init__(self, xml_element, note_type):
self.xml_element = xml_element
self.note_type = note_type
def __contains__(self, item):
if isinstance(item, Note):
item = self.note_type(item, item.lang)
elif isinstance(item, basestring):
item = self.note_type(item)
return item in self.xml_element._note_map.itervalues()
def __iter__(self):
return (unicode(self.xml_element._note_map[element]) for element in self.xml_element.element if element in self.xml_element._note_map)
def __len__(self):
return len(self.xml_element._note_map)
def __nonzero__(self):
return bool(self.xml_element._note_map)
def _parse_element(self, element):
self.xml_element._note_map.clear()
for child in element:
if child.tag == self.note_type.qname:
try:
note = self.note_type.from_element(child, xml_document=self.xml_element._xml_document)
except ValidationError:
pass
else:
self.xml_element._note_map[note.element] = note
def _build_element(self):
for note in self.xml_element._note_map.itervalues():
note.to_element()
def add(self, item):
if isinstance(item, Note):
item = self.note_type(item, item.lang)
elif isinstance(item, basestring):
item = self.note_type(item)
if type(item) is not self.note_type:
raise TypeError("%s cannot add notes of type %s" % (self.xml_element.__class__.__name__, item.__class__.__name__))
self.xml_element._insert_element(item.element)
self.xml_element._note_map[item.element] = item
self.xml_element.__dirty__ = True
def remove(self, item):
if isinstance(item, Note):
try:
item = (entry for entry in self.xml_element._note_map.itervalues() if unicode(entry) == item).next()
except StopIteration:
raise KeyError(item)
elif isinstance(item, basestring):
try:
item = (entry for entry in self.xml_element._note_map.itervalues() if entry == item).next()
except StopIteration:
raise KeyError(item)
if type(item) is not self.note_type:
raise KeyError(item)
self.xml_element.element.remove(item.element)
del self.xml_element._note_map[item.element]
self.xml_element.__dirty__ = True
def update(self, sequence):
for item in sequence:
self.add(item)
def clear(self):
for item in self.xml_element._note_map.values():
self.remove(item)
class DeviceID(XMLStringElement):
_xml_tag = 'deviceID'
_xml_namespace = dm_namespace
_xml_document = PIDFDocument
## Service elements
class Basic(XMLStringElement):
_xml_tag = 'basic'
_xml_namespace = pidf_namespace
_xml_document = PIDFDocument
_xml_value_type = BasicStatusValue
class Status(XMLElement):
_xml_tag = 'status'
_xml_namespace = pidf_namespace
_xml_document = PIDFDocument
_xml_extension_type = StatusExtension
_xml_children_order = {Basic.qname: 0}
basic = XMLElementChild('basic', type=Basic, required=False, test_equal=True)
def __init__(self, basic=None):
XMLElement.__init__(self)
self.basic = basic
def check_validity(self):
if len(self.element) == 0:
raise ValidationError("Status objects must have at least one child")
super(Status, self).check_validity()
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self.basic)
class Contact(XMLAnyURIElement):
_xml_tag = 'contact'
_xml_namespace = pidf_namespace
_xml_document = PIDFDocument
priority = XMLAttribute('priority', type=float, required=False, test_equal=False)
class ServiceTimestamp(XMLDateTimeElement):
_xml_tag = 'timestamp'
_xml_namespace = pidf_namespace
_xml_document = PIDFDocument
class Service(XMLElement):
_xml_tag = 'tuple'
_xml_namespace = pidf_namespace
_xml_document = PIDFDocument
_xml_extension_type = ServiceExtension
_xml_children_order = {Status.qname: 0,
None: 1,
Contact.qname: 2,
PIDFNote.qname: 3,
ServiceTimestamp.qname: 4}
- id = XMLElementID('id', type=str, required=True, test_equal=True)
+ id = XMLElementID('id', type=ID, required=True, test_equal=True)
status = XMLElementChild('status', type=Status, required=True, test_equal=True)
contact = XMLElementChild('contact', type=Contact, required=False, test_equal=True)
timestamp = XMLElementChild('timestamp', type=ServiceTimestamp, required=False, test_equal=True)
device_id = XMLElementChild('device_id', type=DeviceID, required=False, test_equal=True)
_note_map = NoteMap()
def __init__(self, id, notes=[], status=None, contact=None, timestamp=None, device_id=None):
XMLElement.__init__(self)
self.id = id
self.status = status
self.contact = contact
self.timestamp = timestamp
self.device_id = device_id
self.notes.update(notes)
@property
def notes(self):
return NoteList(self, PIDFNote)
def __repr__(self):
return '%s(%r, %r, %r, %r, %r, %r)' % (self.__class__.__name__, self.id, list(self.notes), self.status, self.contact, self.timestamp, self.device_id)
def _parse_element(self, element):
super(Service, self)._parse_element(element)
self.notes._parse_element(element)
def _build_element(self):
super(Service, self)._build_element()
self.notes._build_element()
class DeviceTimestamp(XMLDateTimeElement):
_xml_tag = 'timestamp'
_xml_namespace = dm_namespace
_xml_document = PIDFDocument
class Device(XMLElement):
_xml_tag = 'device'
_xml_namespace = dm_namespace
_xml_document = PIDFDocument
_xml_extension_type = DeviceExtension
_xml_children_order = {None: 0,
DeviceID.qname: 1,
DMNote.qname: 2,
DeviceTimestamp.qname: 3}
- id = XMLElementID('id', type=str, required=True, test_equal=True)
+ id = XMLElementID('id', type=ID, required=True, test_equal=True)
device_id = XMLElementChild('device_id', type=DeviceID, required=False, test_equal=True)
timestamp = XMLElementChild('timestamp', type=DeviceTimestamp, required=False, test_equal=True)
_note_map = NoteMap()
def __init__(self, id, device_id=None, notes=[], timestamp=None):
XMLElement.__init__(self)
self.id = id
self.device_id = device_id
self.timestamp = timestamp
self.notes.update(notes)
@property
def notes(self):
return NoteList(self, DMNote)
def __repr__(self):
return '%s(%r, %r, %r, %r)' % (self.__class__.__name__, self.id, self.device_id, list(self.notes), self.timestamp)
def _parse_element(self, element):
super(Device, self)._parse_element(element)
self.notes._parse_element(element)
def _build_element(self):
super(Device, self)._build_element()
self.notes._build_element()
class PersonTimestamp(XMLDateTimeElement):
_xml_tag = 'timestamp'
_xml_namespace = dm_namespace
_xml_document = PIDFDocument
class Person(XMLElement):
_xml_tag = 'person'
_xml_namespace = dm_namespace
_xml_document = PIDFDocument
_xml_extension_type = PersonExtension
_xml_children_order = {None: 0,
DMNote.qname: 1,
PersonTimestamp.qname: 2}
- id = XMLElementID('id', type=str, required=True, test_equal=True)
+ id = XMLElementID('id', type=ID, required=True, test_equal=True)
timestamp = XMLElementChild('timestamp', type=PersonTimestamp, required=False, test_equal=True)
_note_map = NoteMap()
def __init__(self, id, notes=[], timestamp=None):
XMLElement.__init__(self)
self.id = id
self.timestamp = timestamp
self.notes.update(notes)
@property
def notes(self):
return NoteList(self, DMNote)
def __repr__(self):
return '%s(%r, %r, %r)' % (self.__class__.__name__, self.id, list(self.notes), self.timestamp)
def _parse_element(self, element):
super(Person, self)._parse_element(element)
self.notes._parse_element(element)
def _build_element(self):
super(Person, self)._build_element()
self.notes._build_element()
class PIDF(XMLListRootElement):
_xml_tag = 'presence'
_xml_namespace = pidf_namespace
_xml_document = PIDFDocument
_xml_children_order = {Service.qname: 0,
PIDFNote.qname: 1,
Person.qname: 2,
Device.qname: 3}
_xml_item_type = (Service, PIDFNote, Person, Device)
entity = XMLAttribute('entity', type=AnyURI, required=True, test_equal=True)
services = property(lambda self: (item for item in self if type(item) is Service))
notes = property(lambda self: (item for item in self if type(item) is Note))
persons = property(lambda self: (item for item in self if type(item) is Person))
devices = property(lambda self: (item for item in self if type(item) is Device))
def __init__(self, entity, elements=[]):
XMLListRootElement.__init__(self)
self.entity = entity
self.update(elements)
def __contains__(self, item):
if isinstance(item, Note):
item = PIDFNote(item, item.lang)
return super(PIDF, self).__contains__(item)
def __iter__(self):
return (unicode(item) if type(item) is PIDFNote else item for item in super(PIDF, self).__iter__())
def __repr__(self):
return '%s(%r, %r)' % (self.__class__.__name__, self.entity, list(self))
def add(self, item):
if isinstance(item, Note):
item = PIDFNote(item, item.lang)
super(PIDF, self).add(item)
def remove(self, item):
if isinstance(item, Note):
try:
item = (entry for entry in super(PIDF, self).__iter__() if type(entry) is PIDFNote and unicode(entry) == item).next()
except StopIteration:
raise KeyError(item)
super(PIDF, self).remove(item)
#
# Extensions
#
agp_pidf_namespace = 'urn:ag-projects:xml:ns:pidf'
PIDFDocument.register_namespace(agp_pidf_namespace, prefix='agp-pidf')
class ExtendedStatusValue(str):
def __new__(cls, value):
if value not in ('available', 'offline', 'away', 'extended-away', 'busy'):
raise ValueError("illegal value for extended status")
return str.__new__(cls, value)
class ExtendedStatus(XMLStringElement, StatusExtension):
_xml_tag = 'extended'
_xml_namespace = agp_pidf_namespace
_xml_document = PIDFDocument
_xml_value_type = ExtendedStatusValue
Status.register_extension('extended', type=ExtendedStatus)
class DeviceInfo(XMLElement, ServiceExtension):
_xml_tag = 'device-info'
_xml_namespace = agp_pidf_namespace
_xml_document = PIDFDocument
- id = XMLElementID('id', type=str, required=True, test_equal=True)
+ id = XMLElementID('id', type=ID, required=True, test_equal=True)
description = XMLAttribute('description', type=str, required=False, test_equal=True)
def __init__(self, id, description=None):
XMLElement.__init__(self)
self.id = id
self.description = description
def __repr__(self):
return '%s(%r, %r)' % (self.__class__.__name__, self.id, self.description)
Service.register_extension('device_info', type=DeviceInfo)
diff --git a/sipsimple/payloads/policy.py b/sipsimple/payloads/policy.py
index f3ddc240..2fc1a3e9 100644
--- a/sipsimple/payloads/policy.py
+++ b/sipsimple/payloads/policy.py
@@ -1,353 +1,354 @@
# Copyright (C) 2008-2011 AG Projects. See LICENSE for details.
#
"""
Generic data types to be used in policy applications, according to
RFC4745.
"""
__all__ = ['namespace',
'CommonPolicyDocument',
'ConditionElement',
'ActionElement',
'TransformationElement',
'RuleExtension',
'IdentityOne',
'IdentityExcept',
'IdentityMany',
'Identity',
'Validity',
'Conditions',
'Actions',
'Transformations',
'Rule',
'RuleSet',
# Extensions
'FalseCondition',
'RuleDisplayName']
from sipsimple.payloads import ValidationError, XMLDocument, XMLElement, XMLListElement, XMLListRootElement, XMLAttribute, XMLElementID, XMLElementChild, XMLLocalizedStringElement, XMLDateTimeElement
+from sipsimple.payloads.datatypes import AnyURI, ID
namespace = 'urn:ietf:params:xml:ns:common-policy'
class CommonPolicyDocument(XMLDocument):
content_type = 'application/auth-policy+xml'
CommonPolicyDocument.register_namespace(namespace, prefix='cp', schema='common-policy.xsd')
## Mixin types for extensibility
class ConditionElement(object): pass
class ActionElement(object): pass
class TransformationElement(object): pass
class RuleExtension(object): pass
## Elements
class IdentityOne(XMLElement):
_xml_tag = 'one'
_xml_namespace = namespace
_xml_document = CommonPolicyDocument
id = XMLElementID('id', type=str, required=True, test_equal=True)
def __init__(self, id):
XMLElement.__init__(self)
self.id = id
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self.id)
def __str__(self):
return self.id
def matches(self, uri):
return self.id == uri
class IdentityExcept(XMLElement):
_xml_tag = 'except'
_xml_namespace = namespace
_xml_document = CommonPolicyDocument
def _onset_id(self, attribute, value):
if value is not None:
self.domain = None
id = XMLAttribute('id', type=str, required=False, test_equal=True, onset=_onset_id)
del _onset_id
def _onset_domain(self, attribute, value):
if value is not None:
self.id = None
domain = XMLAttribute('domain', type=str, required=False, test_equal=True, onset=_onset_domain)
del _onset_domain
def __init__(self, id=None, domain=None):
XMLElement.__init__(self)
self.id = id
self.domain = domain
def __repr__(self):
return '%s(%r, %r)' % (self.__class__.__name__, self.id, self.domain)
def __str__(self):
if self.id is not None:
return self.id
else:
return self.domain
def matches(self, uri):
if self.id is not None:
return self.id != uri
else:
return [self.domain] != uri.split('@', 1)[1:]
class IdentityMany(XMLListElement):
_xml_tag = 'many'
_xml_namespace = namespace
_xml_document = CommonPolicyDocument
_xml_children_order = {IdentityExcept.qname: 0}
_xml_item_type = IdentityExcept
domain = XMLAttribute('domain', type=str, required=False, test_equal=True)
def __init__(self, domain=None, exceptions=[]):
XMLListElement.__init__(self)
self.domain = domain
self.update(exceptions)
def __repr__(self):
return '%s(%r, %s)' % (self.__class__.__name__, self.domain, list(self))
def matches(self, uri):
if self.domain is not None:
if self.domain != uri.partition('@')[2]:
return False
for child in self:
if not child.matches(uri):
return False
return True
class Identity(XMLListElement):
_xml_tag = 'identity'
_xml_namespace = namespace
_xml_document = CommonPolicyDocument
_xml_item_type = (IdentityOne, IdentityMany)
def __init__(self, identities=[]):
XMLListElement.__init__(self)
self.update(identities)
def matches(self, uri):
for child in self:
if child.matches(uri):
return True
return False
class Sphere(XMLElement):
_xml_tag = 'sphere'
_xml_namespace = namespace
_xml_document = CommonPolicyDocument
value = XMLAttribute('value', type=str, required=True, test_equal=True)
def __init__(self, value):
XMLElement.__init__(self)
self.value = value
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self.value)
class ValidFrom(XMLDateTimeElement):
_xml_tag = 'from'
_xml_namespace = namespace
_xml_document = CommonPolicyDocument
class ValidUntil(XMLDateTimeElement):
_xml_tag = 'until'
_xml_namespace = namespace
_xml_document = CommonPolicyDocument
class ValidityInterval(object):
def __init__(self, from_timestamp, until_timestamp):
self.valid_from = ValidFrom(from_timestamp)
self.valid_until = ValidUntil(until_timestamp)
def __eq__(self, other):
if isinstance(other, ValidityInterval):
return self is other or (self.valid_from == other.valid_from and self.valid_until == other.valid_until)
return NotImplemented
def __ne__(self, other):
if isinstance(other, ValidityInterval):
return self is not other and (self.valid_from != other.valid_from or self.valid_until != other.valid_until)
return NotImplemented
@classmethod
def from_elements(cls, from_element, until_element, xml_document=None):
instance = object.__new__(cls)
instance.valid_from = ValidFrom.from_element(from_element, xml_document)
instance.valid_until = ValidUntil.from_element(until_element, xml_document)
return instance
class Validity(XMLListElement):
_xml_tag = 'validity'
_xml_namespace = namespace
_xml_document = CommonPolicyDocument
_xml_item_type = ValidityInterval
def __init__(self, children=[]):
XMLListElement.__init__(self)
self.update(children)
def _parse_element(self, element):
iterator = iter(element)
for first_child in iterator:
second_child = iterator.next()
if first_child.tag == '{%s}from' % self._xml_namespace and second_child.tag == '{%s}until' % self._xml_namespace:
try:
item = ValidityInterval.from_elements(first_child, second_child, xml_document=self._xml_document)
except:
pass
else:
self._element_map[item.valid_from.element] = item
def _build_element(self):
for child in self:
child.valid_from.to_element()
child.valid_until.to_element()
def add(self, item):
if not isinstance(item, ValidityInterval):
raise TypeError("Validity element must be a ValidityInterval instance")
self._insert_element(item.valid_from.element)
self._insert_element(item.valid_until.element)
self._element_map[item.valid_from.element] = item
self.__dirty__ = True
def remove(self, item):
self.element.remove(item.valid_from.element)
self.element.remove(item.valid_until.element)
del self._element_map[item.valid_from.element]
self.__dirty__ = True
def check_validity(self):
if not self:
raise ValidationError("cannot have Validity element without any children")
super(Validity, self).check_validity(self)
class Conditions(XMLListElement):
_xml_tag = 'conditions'
_xml_namespace = namespace
_xml_document = CommonPolicyDocument
_xml_children_order = {Identity.qname: 0,
Sphere.qname: 1,
Validity.qname: 2}
_xml_item_type = (Identity, Sphere, Validity, ConditionElement)
def __init__(self, conditions=[]):
XMLListElement.__init__(self)
self.update(conditions)
class Actions(XMLListElement):
_xml_tag = 'actions'
_xml_namespace = namespace
_xml_document = CommonPolicyDocument
_xml_item_type = ActionElement
def __init__(self, actions=[]):
XMLListElement.__init__(self)
self.update(actions)
class Transformations(XMLListElement):
_xml_tag = 'transformations'
_xml_namespace = namespace
_xml_document = CommonPolicyDocument
_xml_item_type = TransformationElement
def __init__(self, transformations=[]):
XMLListElement.__init__(self)
self.update(transformations)
class Rule(XMLElement):
_xml_tag = 'rule'
_xml_namespace = namespace
_xml_extension_type = RuleExtension
_xml_document = CommonPolicyDocument
_xml_children_order = {Conditions.qname: 0,
Actions.qname: 1,
Transformations.qname: 2}
- id = XMLElementID('id', type=unicode, required=True, test_equal=True)
+ id = XMLElementID('id', type=ID, required=True, test_equal=True)
conditions = XMLElementChild('conditions', type=Conditions, required=False, test_equal=True)
actions = XMLElementChild('actions', type=Actions, required=False, test_equal=True)
transformations = XMLElementChild('transformations', type=Transformations, required=False, test_equal=True)
def __init__(self, id, conditions=None, actions=None, transformations=None):
XMLElement.__init__(self)
self.id = id
self.conditions = conditions
self.actions = actions
self.transformations = transformations
def __repr__(self):
return '%s(%r, %r, %r, %r)' % (self.__class__.__name__, self.id, self.conditions, self.actions, self.transformations)
class RuleSet(XMLListRootElement):
_xml_tag = 'ruleset'
_xml_namespace = namespace
_xml_document = CommonPolicyDocument
_xml_item_type = Rule
def __init__(self, rules=[]):
XMLListRootElement.__init__(self)
self.update(rules)
def __getitem__(self, key):
return self._xmlid_map[Rule][key]
def __delitem__(self, key):
self.remove(self._xmlid_map[Rule][key])
#
# Extensions
#
agp_cp_namespace = 'urn:ag-projects:xml:ns:common-policy'
CommonPolicyDocument.register_namespace(agp_cp_namespace, prefix='agp-cp')
# A condition element in the AG Projects namespace, it will always be evaluated to false
# because it's not understood by servers
class FalseCondition(XMLElement, ConditionElement):
_xml_tag = 'false-condition'
_xml_namespace = agp_cp_namespace
_xml_document = CommonPolicyDocument
class RuleDisplayName(XMLLocalizedStringElement, RuleExtension):
_xml_tag = 'display-name'
_xml_namespace = agp_cp_namespace
_xml_document = CommonPolicyDocument
Rule.register_extension('display_name', RuleDisplayName)

File Metadata

Mime Type
text/x-diff
Expires
Sat, Feb 1, 10:43 AM (22 h, 55 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3489220
Default Alt Text
(43 KB)

Event Timeline