Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F7313143
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
43 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/sipsimple/payloads/addressbook.py b/sipsimple/payloads/addressbook.py
index 9489c01f..477d9174 100644
--- a/sipsimple/payloads/addressbook.py
+++ b/sipsimple/payloads/addressbook.py
@@ -1,279 +1,279 @@
# Copyright (C) 2012 AG Projects. See LICENSE for details.
#
"""Addressbook related payload elements"""
__all__ = ['namespace', 'Group', 'Contact', 'ContactURI', 'ContactURIList', 'ElementExtension', 'ElementAttributes']
from lxml import etree
from sipsimple.payloads import XMLElement, XMLListElement, XMLStringElement, XMLBooleanElement, XMLElementID, XMLAttribute, XMLElementChild
-from sipsimple.payloads.datatypes import AnyURI
+from sipsimple.payloads.datatypes import AnyURI, ID
from sipsimple.payloads.resourcelists import ResourceListsDocument, ListElement
namespace = 'urn:ag-projects:xml:ns:addressbook'
ResourceListsDocument.register_namespace(namespace, prefix='addressbook', schema='addressbook.xsd')
class ElementExtension(object): pass
class Name(XMLStringElement):
_xml_tag = 'name'
_xml_namespace = namespace
_xml_document = ResourceListsDocument
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self.value)
class Group(XMLElement, ListElement):
_xml_tag = 'group'
_xml_namespace = namespace
_xml_extension_type = ElementExtension
_xml_document = ResourceListsDocument
- id = XMLElementID('id', type=str, required=True, test_equal=True)
+ id = XMLElementID('id', type=ID, required=True, test_equal=True)
name = XMLElementChild('name', type=Name, required=True, test_equal=True)
def __init__(self, id, name):
XMLElement.__init__(self)
self.id = id
self.name = name
def __unicode__(self):
return unicode(self.name)
def __repr__(self):
return '%s(%r, %r)' % (self.__class__.__name__, self.id, self.name)
class ContactURI(XMLElement):
_xml_tag = 'uri'
_xml_namespace = namespace
_xml_extension_type = ElementExtension
_xml_document = ResourceListsDocument
- id = XMLElementID('id', type=str, required=True, test_equal=True)
+ id = XMLElementID('id', type=ID, required=True, test_equal=True)
uri = XMLAttribute('uri', type=AnyURI, required=True, test_equal=True)
type = XMLAttribute('type', type=unicode, required=False, test_equal=True)
def __init__(self, id, uri, type=None):
XMLElement.__init__(self)
self.id = id
self.uri = uri
self.type = type
def __unicode__(self):
return unicode(self.uri)
def __repr__(self):
return '%s(%r, %r, %r)' % (self.__class__.__name__, self.id, self.uri, self.type)
class ContactURIList(XMLListElement):
_xml_tag = 'uris'
_xml_namespace = namespace
_xml_document = ResourceListsDocument
_xml_item_type = ContactURI
def __init__(self, uris=[]):
XMLListElement.__init__(self)
self.update(uris)
class PolicyValue(str):
def __new__(cls, value):
if value not in ('allow', 'block', 'ignore', 'confirm'):
raise ValueError("Invalid policy value: %s" % value)
return super(PolicyValue, cls).__new__(cls, value)
class Policy(XMLStringElement):
_xml_tag = 'policy'
_xml_namespace = namespace
_xml_document = ResourceListsDocument
_xml_value_type = PolicyValue
class Subscribe(XMLBooleanElement):
_xml_tag = 'subscribe'
_xml_namespace = namespace
_xml_document = ResourceListsDocument
class DialogHandling(XMLElement):
_xml_tag = 'dialog'
_xml_namespace = namespace
_xml_document = ResourceListsDocument
policy = XMLElementChild('policy', type=Policy, required=True, test_equal=True)
subscribe = XMLElementChild('subscribe', type=Subscribe, required=True, test_equal=True)
def __init__(self, policy, subscribe):
XMLElement.__init__(self)
self.policy = policy
self.subscribe = subscribe
def __repr__(self):
return '%s(%r, %r)' % (self.__class__.__name__, self.policy, self.subscribe)
class PresenceHandling(XMLElement):
_xml_tag = 'presence'
_xml_namespace = namespace
_xml_document = ResourceListsDocument
policy = XMLElementChild('policy', type=Policy, required=True, test_equal=True)
subscribe = XMLElementChild('subscribe', type=Subscribe, required=True, test_equal=True)
def __init__(self, policy, subscribe):
XMLElement.__init__(self)
self.policy = policy
self.subscribe = subscribe
def __repr__(self):
return '%s(%r, %r)' % (self.__class__.__name__, self.policy, self.subscribe)
class Contact(XMLElement, ListElement):
_xml_tag = 'contact'
_xml_namespace = namespace
_xml_extension_type = ElementExtension
_xml_document = ResourceListsDocument
- id = XMLElementID('id', type=str, required=True, test_equal=True)
+ id = XMLElementID('id', type=ID, required=True, test_equal=True)
group_id = XMLAttribute('group_id', type=str, required=True, test_equal=True)
name = XMLElementChild('name', type=Name, required=True, test_equal=True)
uris = XMLElementChild('uris', type=ContactURIList, required=True, test_equal=True)
dialog = XMLElementChild('dialog', type=DialogHandling, required=True, test_equal=True)
presence = XMLElementChild('presence', type=PresenceHandling, required=True, test_equal=True)
def __init__(self, id, group_id, name, uris=[], presence_handling=None, dialog_handling=None):
XMLElement.__init__(self)
self.id = id
self.group_id = group_id
self.name = name
self.uris = ContactURIList(uris)
self.dialog = dialog_handling or DialogHandling('confirm', False)
self.presence = presence_handling or PresenceHandling('confirm', False)
def __repr__(self):
return '%s(%r, %r, %r, %r, %r, %r)' % (self.__class__.__name__, self.id, self.group_id, self.name, list(self.uris), self.presence, self.dialog)
#
# Extensions
#
class ElementAttributes(XMLElement, ElementExtension):
_xml_tag = 'attributes'
_xml_namespace = 'urn:ag-projects:sipsimple:xml:ns:addressbook'
_xml_document = ResourceListsDocument
def __init__(self, attributes={}):
XMLElement.__init__(self)
self._attributes = dict()
self.update(attributes)
def _parse_element(self, element):
self._attributes = dict()
attribute_tag = '{%s}attribute' % self._xml_namespace
for child in (child for child in element if child.tag == attribute_tag):
if 'nil' in child.attrib:
self[child.attrib['name']] = None
else:
self[child.attrib['name']] = unicode(child.text or u'')
def _build_element(self):
self.element.clear()
attribute_tag = '{%s}attribute' % self._xml_namespace
for key, value in self.iteritems():
child = etree.SubElement(self.element, attribute_tag, nsmap=self._xml_document.nsmap)
child.attrib['name'] = key
if value is None:
child.attrib['nil'] = 'true'
else:
child.text = value
def __contains__(self, key):
return key in self._attributes
def __iter__(self):
return iter(self._attributes)
def __getitem__(self, key):
return self._attributes[key]
def __setitem__(self, key, value):
if self._attributes.get(key, None) == value:
return
self._attributes[key] = value
self.__dirty__ = True
def __delitem__(self, key):
del self._attributes[key]
self.__dirty__ = True
def __repr__(self):
return "%s(%r)" % (self.__class__.__name__, dict(self))
def clear(self):
if self._attributes:
self._attributes.clear()
self.__dirty__ = True
def get(self, key, default=None):
return self._attributes.get(key, default)
def has_key(self, key):
return key in self._attributes
def items(self):
return self._attributes.items()
def iteritems(self):
return self._attributes.iteritems()
def iterkeys(self):
return self._attributes.iterkeys()
def itervalues(self):
return self._attributes.itervalues()
def keys(self):
return self._attributes.keys()
def pop(self, key, *args):
value = self._attributes.pop(key, *args)
if not args or value is not args[0]:
self.__dirty__ = True
return value
def popitem(self):
value = self._attributes.popitem()
self.__dirty__ = True
return value
def setdefault(self, key, default=None):
value = self._attributes.setdefault(key, default)
if value is default:
self.__dirty__ = True
return value
def update(self, attributes=(), **kw):
self._attributes.update(attributes, **kw)
if attributes or kw:
self.__dirty__ = True
ResourceListsDocument.register_namespace(ElementAttributes._xml_namespace, prefix='sipsimple')
Group.register_extension('attributes', ElementAttributes)
Contact.register_extension('attributes', ElementAttributes)
ContactURI.register_extension('attributes', ElementAttributes)
diff --git a/sipsimple/payloads/datatypes.py b/sipsimple/payloads/datatypes.py
index 189da414..57854917 100644
--- a/sipsimple/payloads/datatypes.py
+++ b/sipsimple/payloads/datatypes.py
@@ -1,229 +1,238 @@
# Copyright (C) 2008-2011 AG Projects. See LICENSE for details.
#
"""Data types used for simple XML elements and for XML attributes"""
__all__ = ['Boolean', 'DateTime', 'Byte', 'UnsignedByte', 'Short', 'UnsignedShort', 'Int', 'UnsignedInt', 'Long', 'UnsignedLong',
'PositiveInteger', 'NegativeInteger', 'NonNegativeInteger', 'NonPositiveInteger', 'AnyURI', 'SIPURI', 'XCAPURI']
import re
import urllib
import urlparse
from sipsimple.util import Timestamp
class Boolean(int):
def __new__(cls, value):
return int.__new__(cls, bool(value))
def __repr__(self):
return 'True' if self else 'False'
__str__ = __repr__
@classmethod
def __xmlparse__(cls, value):
if value in ('True', 'true'):
return int.__new__(cls, 1)
elif value in ('False', 'false'):
return int.__new__(cls, 0)
else:
raise ValueError("Invalid boolean string representation: %s" % value)
def __xmlbuild__(self):
return u'true' if self else u'false'
class DateTime(Timestamp):
@classmethod
def __xmlparse__(cls, value):
return cls.parse(value)
def __xmlbuild__(self):
return self.format(self)
class Byte(int):
def __new__(cls, value):
instance = int.__new__(cls, value)
if not (-128 <= instance <= 127):
raise ValueError("integer number must be a signed 8bit value")
return instance
class UnsignedByte(int):
def __new__(cls, value):
instance = int.__new__(cls, value)
if not (0 <= instance <= 255):
raise ValueError("integer number must be an unsigned 8bit value")
return instance
class Short(int):
def __new__(cls, value):
instance = int.__new__(cls, value)
if not (-32768 <= instance <= 32767):
raise ValueError("integer number must be a signed 16bit value")
return instance
class UnsignedShort(int):
def __new__(cls, value):
instance = int.__new__(cls, value)
if not (0 <= instance <= 65535):
raise ValueError("integer number must be an unsigned 16bit value")
return instance
class Int(long):
def __new__(cls, value):
instance = long.__new__(cls, value)
if not (-2147483648 <= instance <= 2147483647):
raise ValueError("integer number must be a signed 32bit value")
return instance
class UnsignedInt(long):
def __new__(cls, value):
instance = long.__new__(cls, value)
if not (0 <= instance <= 4294967295):
raise ValueError("integer number must be an unsigned 32bit value")
return instance
class Long(long):
def __new__(cls, value):
instance = long.__new__(cls, value)
if not (-9223372036854775808 <= instance <= 9223372036854775807):
raise ValueError("integer number must be a signed 64bit value")
return instance
class UnsignedLong(long):
def __new__(cls, value):
instance = long.__new__(cls, value)
if not (0 <= instance <= 18446744073709551615):
raise ValueError("integer number must be an unsigned 64bit value")
return instance
class PositiveInteger(long):
def __new__(cls, value):
instance = long.__new__(cls, value)
if instance <= 0:
raise ValueError("integer number must be a positive value")
return instance
class NegativeInteger(long):
def __new__(cls, value):
instance = long.__new__(cls, value)
if instance >= 0:
raise ValueError("integer number must be a negative value")
return instance
class NonNegativeInteger(long):
def __new__(cls, value):
instance = long.__new__(cls, value)
if instance < 0:
raise ValueError("integer number must be a non-negative value")
return instance
class NonPositiveInteger(long):
def __new__(cls, value):
instance = long.__new__(cls, value)
if instance > 0:
raise ValueError("integer number must be a non-positive value")
return instance
class AnyURI(unicode):
@classmethod
def __xmlparse__(cls, value):
return cls.__new__(cls, urllib.unquote(value).decode('utf-8'))
def __xmlbuild__(self):
return urllib.quote(self.encode('utf-8'))
class SIPURI(AnyURI):
_path_regex = re.compile(r'^((?P<username>[^:@]+)(:(?P<password>[^@]+))?@)?(?P<domain>.*)$')
def __new__(cls, value):
instance = AnyURI.__new__(cls, value)
uri = urlparse.urlparse(instance)
if uri.scheme not in ('sip', 'sips'):
raise ValueError("illegal scheme for SIP URI: %s" % uri.scheme)
instance.scheme = uri.scheme
instance.__dict__.update(cls._path_regex.match(uri.path).groupdict())
instance.params = {}
if uri.params:
params = (param.split('=', 1) for param in uri.params.split(';'))
for param in params:
if not param[0]:
raise ValueError("illegal SIP URI parameter name: %s" % param[0])
if len(param) == 1:
param.append(None)
elif '=' in param[1]:
raise ValueError("illegal SIP URI parameter value: %s" % param[1])
instance.params[param[0]] = param[1]
if uri.query:
try:
instance.headers = dict(header.split('=') for header in uri.query.split('&'))
except ValueError:
raise ValueError("illegal SIP URI headers: %s" % uri.query)
else:
for name, value in instance.headers.iteritems():
if not name or not value:
raise ValueError("illegal URI header: %s=%s" % (name, value))
else:
instance.headers = {}
return instance
class XCAPURI(AnyURI):
_path_regex = re.compile(r'^(?P<root>/(([^/]+)/)*)?(?P<auid>[^/]+)/((?P<globaltree>global)|(users/(?P<userstree>[^/]+)))/(?P<document>~?(([^~]+~)|([^~]+))*)(/~~(?P<node>.*))?$')
def __new__(cls, value):
instance = AnyURI.__new__(cls, value)
uri = urlparse.urlparse(instance)
if uri.scheme not in ('http', 'https', ''):
raise ValueError("illegal scheme for XCAP URI: %s" % uri.scheme)
instance.scheme = uri.scheme
instance.username = uri.username
instance.password = uri.password
instance.hostname = uri.hostname
instance.port = uri.port
instance.__dict__.update(cls._path_regex.match(uri.path).groupdict())
instance.globaltree = instance.globaltree is not None
if uri.query:
try:
instance.query = dict(header.split('=') for header in uri.query.split('&'))
except ValueError:
raise ValueError("illegal XCAP URI query string: %s" % uri.query)
else:
for name, value in instance.query.iteritems():
if not name or not value:
raise ValueError("illegal XCAP URI query parameter: %s=%s" % (name, value))
else:
instance.query = {}
return instance
relative = property(lambda self: self.scheme == '')
+class ID(str):
+ _id_regex = re.compile(r'^[a-z_][a-z0-9_\.\-]*', re.I)
+
+ def __new__(cls, value):
+ if not cls._id_regex.match(value):
+ raise ValueError("illegal ID value: %s" % value)
+ return str.__new__(cls, value)
+
+
diff --git a/sipsimple/payloads/pidf.py b/sipsimple/payloads/pidf.py
index fcce3d2b..fd71fc49 100644
--- a/sipsimple/payloads/pidf.py
+++ b/sipsimple/payloads/pidf.py
@@ -1,467 +1,467 @@
# Copyright (C) 2008-2011 AG Projects. See LICENSE for details.
#
"""PIDF handling according to RFC3863 and RFC4479"""
__all__ = ['pidf_namespace',
'dm_namespace',
'PIDFDocument',
'ServiceExtension',
'DeviceExtension',
'PersonExtension',
'StatusExtension',
'Note',
'DeviceID',
'Status',
'Basic',
'Contact',
'ServiceTimestamp',
'Service',
'DeviceTimestamp',
'Device',
'PersonTimestamp',
'Person',
'PIDF',
# Extensions
'ExtendedStatus']
import weakref
from sipsimple.payloads import ValidationError, XMLDocument, XMLListRootElement, XMLElement, XMLAttribute, XMLElementID, XMLElementChild
from sipsimple.payloads import XMLStringElement, XMLLocalizedStringElement, XMLDateTimeElement, XMLAnyURIElement
-from sipsimple.payloads.datatypes import AnyURI
+from sipsimple.payloads.datatypes import AnyURI, ID
pidf_namespace = 'urn:ietf:params:xml:ns:pidf'
dm_namespace = 'urn:ietf:params:xml:ns:pidf:data-model'
class PIDFDocument(XMLDocument):
content_type = 'application/pidf+xml'
PIDFDocument.register_namespace(pidf_namespace, prefix=None, schema='pidf.xsd')
PIDFDocument.register_namespace(dm_namespace, prefix='dm', schema='data-model.xsd')
## Marker mixin
class ServiceExtension(object): pass
class DeviceExtension(object): pass
class PersonExtension(object): pass
class StatusExtension(object): pass
## Attribute value types
class BasicStatusValue(str):
def __new__(cls, value):
if value not in ('closed', 'open'):
raise ValueError('illegal BasicStatusValue')
return str.__new__(cls, value)
## General elements
class Note(unicode):
def __new__(cls, value, lang=None):
instance = unicode.__new__(cls, value)
instance.lang = lang
return instance
def __repr__(self):
return "%s(%s, lang=%r)" % (self.__class__.__name__, unicode.__repr__(self), self.lang)
def __eq__(self, other):
if isinstance(other, Note):
return unicode.__eq__(self, other) and self.lang == other.lang
elif isinstance(other, basestring):
return self.lang is None and unicode.__eq__(self, other)
else:
return NotImplemented
def __ne__(self, other):
equal = self.__eq__(other)
return NotImplemented if equal is NotImplemented else not equal
class PIDFNote(XMLLocalizedStringElement):
_xml_tag = 'note'
_xml_namespace = pidf_namespace
_xml_document = PIDFDocument
def __unicode__(self):
return Note(self.value, self.lang)
class DMNote(XMLLocalizedStringElement):
_xml_tag = 'note'
_xml_namespace = dm_namespace
_xml_document = PIDFDocument
def __unicode__(self):
return Note(self.value, self.lang)
class NoteMap(object):
"""Descriptor to be used for _note_map attributes on XML elements with notes"""
def __init__(self):
self.object_map = weakref.WeakKeyDictionary()
def __get__(self, obj, type):
if obj is None:
return self
return self.object_map.setdefault(obj, {})
def __set__(self, obj, value):
raise AttributeError("cannot set attribute")
def __delete__(self, obj):
raise AttributeError("cannot delete attribute")
class NoteList(object):
def __init__(self, xml_element, note_type):
self.xml_element = xml_element
self.note_type = note_type
def __contains__(self, item):
if isinstance(item, Note):
item = self.note_type(item, item.lang)
elif isinstance(item, basestring):
item = self.note_type(item)
return item in self.xml_element._note_map.itervalues()
def __iter__(self):
return (unicode(self.xml_element._note_map[element]) for element in self.xml_element.element if element in self.xml_element._note_map)
def __len__(self):
return len(self.xml_element._note_map)
def __nonzero__(self):
return bool(self.xml_element._note_map)
def _parse_element(self, element):
self.xml_element._note_map.clear()
for child in element:
if child.tag == self.note_type.qname:
try:
note = self.note_type.from_element(child, xml_document=self.xml_element._xml_document)
except ValidationError:
pass
else:
self.xml_element._note_map[note.element] = note
def _build_element(self):
for note in self.xml_element._note_map.itervalues():
note.to_element()
def add(self, item):
if isinstance(item, Note):
item = self.note_type(item, item.lang)
elif isinstance(item, basestring):
item = self.note_type(item)
if type(item) is not self.note_type:
raise TypeError("%s cannot add notes of type %s" % (self.xml_element.__class__.__name__, item.__class__.__name__))
self.xml_element._insert_element(item.element)
self.xml_element._note_map[item.element] = item
self.xml_element.__dirty__ = True
def remove(self, item):
if isinstance(item, Note):
try:
item = (entry for entry in self.xml_element._note_map.itervalues() if unicode(entry) == item).next()
except StopIteration:
raise KeyError(item)
elif isinstance(item, basestring):
try:
item = (entry for entry in self.xml_element._note_map.itervalues() if entry == item).next()
except StopIteration:
raise KeyError(item)
if type(item) is not self.note_type:
raise KeyError(item)
self.xml_element.element.remove(item.element)
del self.xml_element._note_map[item.element]
self.xml_element.__dirty__ = True
def update(self, sequence):
for item in sequence:
self.add(item)
def clear(self):
for item in self.xml_element._note_map.values():
self.remove(item)
class DeviceID(XMLStringElement):
_xml_tag = 'deviceID'
_xml_namespace = dm_namespace
_xml_document = PIDFDocument
## Service elements
class Basic(XMLStringElement):
_xml_tag = 'basic'
_xml_namespace = pidf_namespace
_xml_document = PIDFDocument
_xml_value_type = BasicStatusValue
class Status(XMLElement):
_xml_tag = 'status'
_xml_namespace = pidf_namespace
_xml_document = PIDFDocument
_xml_extension_type = StatusExtension
_xml_children_order = {Basic.qname: 0}
basic = XMLElementChild('basic', type=Basic, required=False, test_equal=True)
def __init__(self, basic=None):
XMLElement.__init__(self)
self.basic = basic
def check_validity(self):
if len(self.element) == 0:
raise ValidationError("Status objects must have at least one child")
super(Status, self).check_validity()
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self.basic)
class Contact(XMLAnyURIElement):
_xml_tag = 'contact'
_xml_namespace = pidf_namespace
_xml_document = PIDFDocument
priority = XMLAttribute('priority', type=float, required=False, test_equal=False)
class ServiceTimestamp(XMLDateTimeElement):
_xml_tag = 'timestamp'
_xml_namespace = pidf_namespace
_xml_document = PIDFDocument
class Service(XMLElement):
_xml_tag = 'tuple'
_xml_namespace = pidf_namespace
_xml_document = PIDFDocument
_xml_extension_type = ServiceExtension
_xml_children_order = {Status.qname: 0,
None: 1,
Contact.qname: 2,
PIDFNote.qname: 3,
ServiceTimestamp.qname: 4}
- id = XMLElementID('id', type=str, required=True, test_equal=True)
+ id = XMLElementID('id', type=ID, required=True, test_equal=True)
status = XMLElementChild('status', type=Status, required=True, test_equal=True)
contact = XMLElementChild('contact', type=Contact, required=False, test_equal=True)
timestamp = XMLElementChild('timestamp', type=ServiceTimestamp, required=False, test_equal=True)
device_id = XMLElementChild('device_id', type=DeviceID, required=False, test_equal=True)
_note_map = NoteMap()
def __init__(self, id, notes=[], status=None, contact=None, timestamp=None, device_id=None):
XMLElement.__init__(self)
self.id = id
self.status = status
self.contact = contact
self.timestamp = timestamp
self.device_id = device_id
self.notes.update(notes)
@property
def notes(self):
return NoteList(self, PIDFNote)
def __repr__(self):
return '%s(%r, %r, %r, %r, %r, %r)' % (self.__class__.__name__, self.id, list(self.notes), self.status, self.contact, self.timestamp, self.device_id)
def _parse_element(self, element):
super(Service, self)._parse_element(element)
self.notes._parse_element(element)
def _build_element(self):
super(Service, self)._build_element()
self.notes._build_element()
class DeviceTimestamp(XMLDateTimeElement):
_xml_tag = 'timestamp'
_xml_namespace = dm_namespace
_xml_document = PIDFDocument
class Device(XMLElement):
_xml_tag = 'device'
_xml_namespace = dm_namespace
_xml_document = PIDFDocument
_xml_extension_type = DeviceExtension
_xml_children_order = {None: 0,
DeviceID.qname: 1,
DMNote.qname: 2,
DeviceTimestamp.qname: 3}
- id = XMLElementID('id', type=str, required=True, test_equal=True)
+ id = XMLElementID('id', type=ID, required=True, test_equal=True)
device_id = XMLElementChild('device_id', type=DeviceID, required=False, test_equal=True)
timestamp = XMLElementChild('timestamp', type=DeviceTimestamp, required=False, test_equal=True)
_note_map = NoteMap()
def __init__(self, id, device_id=None, notes=[], timestamp=None):
XMLElement.__init__(self)
self.id = id
self.device_id = device_id
self.timestamp = timestamp
self.notes.update(notes)
@property
def notes(self):
return NoteList(self, DMNote)
def __repr__(self):
return '%s(%r, %r, %r, %r)' % (self.__class__.__name__, self.id, self.device_id, list(self.notes), self.timestamp)
def _parse_element(self, element):
super(Device, self)._parse_element(element)
self.notes._parse_element(element)
def _build_element(self):
super(Device, self)._build_element()
self.notes._build_element()
class PersonTimestamp(XMLDateTimeElement):
_xml_tag = 'timestamp'
_xml_namespace = dm_namespace
_xml_document = PIDFDocument
class Person(XMLElement):
_xml_tag = 'person'
_xml_namespace = dm_namespace
_xml_document = PIDFDocument
_xml_extension_type = PersonExtension
_xml_children_order = {None: 0,
DMNote.qname: 1,
PersonTimestamp.qname: 2}
- id = XMLElementID('id', type=str, required=True, test_equal=True)
+ id = XMLElementID('id', type=ID, required=True, test_equal=True)
timestamp = XMLElementChild('timestamp', type=PersonTimestamp, required=False, test_equal=True)
_note_map = NoteMap()
def __init__(self, id, notes=[], timestamp=None):
XMLElement.__init__(self)
self.id = id
self.timestamp = timestamp
self.notes.update(notes)
@property
def notes(self):
return NoteList(self, DMNote)
def __repr__(self):
return '%s(%r, %r, %r)' % (self.__class__.__name__, self.id, list(self.notes), self.timestamp)
def _parse_element(self, element):
super(Person, self)._parse_element(element)
self.notes._parse_element(element)
def _build_element(self):
super(Person, self)._build_element()
self.notes._build_element()
class PIDF(XMLListRootElement):
_xml_tag = 'presence'
_xml_namespace = pidf_namespace
_xml_document = PIDFDocument
_xml_children_order = {Service.qname: 0,
PIDFNote.qname: 1,
Person.qname: 2,
Device.qname: 3}
_xml_item_type = (Service, PIDFNote, Person, Device)
entity = XMLAttribute('entity', type=AnyURI, required=True, test_equal=True)
services = property(lambda self: (item for item in self if type(item) is Service))
notes = property(lambda self: (item for item in self if type(item) is Note))
persons = property(lambda self: (item for item in self if type(item) is Person))
devices = property(lambda self: (item for item in self if type(item) is Device))
def __init__(self, entity, elements=[]):
XMLListRootElement.__init__(self)
self.entity = entity
self.update(elements)
def __contains__(self, item):
if isinstance(item, Note):
item = PIDFNote(item, item.lang)
return super(PIDF, self).__contains__(item)
def __iter__(self):
return (unicode(item) if type(item) is PIDFNote else item for item in super(PIDF, self).__iter__())
def __repr__(self):
return '%s(%r, %r)' % (self.__class__.__name__, self.entity, list(self))
def add(self, item):
if isinstance(item, Note):
item = PIDFNote(item, item.lang)
super(PIDF, self).add(item)
def remove(self, item):
if isinstance(item, Note):
try:
item = (entry for entry in super(PIDF, self).__iter__() if type(entry) is PIDFNote and unicode(entry) == item).next()
except StopIteration:
raise KeyError(item)
super(PIDF, self).remove(item)
#
# Extensions
#
agp_pidf_namespace = 'urn:ag-projects:xml:ns:pidf'
PIDFDocument.register_namespace(agp_pidf_namespace, prefix='agp-pidf')
class ExtendedStatusValue(str):
def __new__(cls, value):
if value not in ('available', 'offline', 'away', 'extended-away', 'busy'):
raise ValueError("illegal value for extended status")
return str.__new__(cls, value)
class ExtendedStatus(XMLStringElement, StatusExtension):
_xml_tag = 'extended'
_xml_namespace = agp_pidf_namespace
_xml_document = PIDFDocument
_xml_value_type = ExtendedStatusValue
Status.register_extension('extended', type=ExtendedStatus)
class DeviceInfo(XMLElement, ServiceExtension):
_xml_tag = 'device-info'
_xml_namespace = agp_pidf_namespace
_xml_document = PIDFDocument
- id = XMLElementID('id', type=str, required=True, test_equal=True)
+ id = XMLElementID('id', type=ID, required=True, test_equal=True)
description = XMLAttribute('description', type=str, required=False, test_equal=True)
def __init__(self, id, description=None):
XMLElement.__init__(self)
self.id = id
self.description = description
def __repr__(self):
return '%s(%r, %r)' % (self.__class__.__name__, self.id, self.description)
Service.register_extension('device_info', type=DeviceInfo)
diff --git a/sipsimple/payloads/policy.py b/sipsimple/payloads/policy.py
index f3ddc240..2fc1a3e9 100644
--- a/sipsimple/payloads/policy.py
+++ b/sipsimple/payloads/policy.py
@@ -1,353 +1,354 @@
# Copyright (C) 2008-2011 AG Projects. See LICENSE for details.
#
"""
Generic data types to be used in policy applications, according to
RFC4745.
"""
__all__ = ['namespace',
'CommonPolicyDocument',
'ConditionElement',
'ActionElement',
'TransformationElement',
'RuleExtension',
'IdentityOne',
'IdentityExcept',
'IdentityMany',
'Identity',
'Validity',
'Conditions',
'Actions',
'Transformations',
'Rule',
'RuleSet',
# Extensions
'FalseCondition',
'RuleDisplayName']
from sipsimple.payloads import ValidationError, XMLDocument, XMLElement, XMLListElement, XMLListRootElement, XMLAttribute, XMLElementID, XMLElementChild, XMLLocalizedStringElement, XMLDateTimeElement
+from sipsimple.payloads.datatypes import AnyURI, ID
namespace = 'urn:ietf:params:xml:ns:common-policy'
class CommonPolicyDocument(XMLDocument):
content_type = 'application/auth-policy+xml'
CommonPolicyDocument.register_namespace(namespace, prefix='cp', schema='common-policy.xsd')
## Mixin types for extensibility
class ConditionElement(object): pass
class ActionElement(object): pass
class TransformationElement(object): pass
class RuleExtension(object): pass
## Elements
class IdentityOne(XMLElement):
_xml_tag = 'one'
_xml_namespace = namespace
_xml_document = CommonPolicyDocument
id = XMLElementID('id', type=str, required=True, test_equal=True)
def __init__(self, id):
XMLElement.__init__(self)
self.id = id
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self.id)
def __str__(self):
return self.id
def matches(self, uri):
return self.id == uri
class IdentityExcept(XMLElement):
_xml_tag = 'except'
_xml_namespace = namespace
_xml_document = CommonPolicyDocument
def _onset_id(self, attribute, value):
if value is not None:
self.domain = None
id = XMLAttribute('id', type=str, required=False, test_equal=True, onset=_onset_id)
del _onset_id
def _onset_domain(self, attribute, value):
if value is not None:
self.id = None
domain = XMLAttribute('domain', type=str, required=False, test_equal=True, onset=_onset_domain)
del _onset_domain
def __init__(self, id=None, domain=None):
XMLElement.__init__(self)
self.id = id
self.domain = domain
def __repr__(self):
return '%s(%r, %r)' % (self.__class__.__name__, self.id, self.domain)
def __str__(self):
if self.id is not None:
return self.id
else:
return self.domain
def matches(self, uri):
if self.id is not None:
return self.id != uri
else:
return [self.domain] != uri.split('@', 1)[1:]
class IdentityMany(XMLListElement):
_xml_tag = 'many'
_xml_namespace = namespace
_xml_document = CommonPolicyDocument
_xml_children_order = {IdentityExcept.qname: 0}
_xml_item_type = IdentityExcept
domain = XMLAttribute('domain', type=str, required=False, test_equal=True)
def __init__(self, domain=None, exceptions=[]):
XMLListElement.__init__(self)
self.domain = domain
self.update(exceptions)
def __repr__(self):
return '%s(%r, %s)' % (self.__class__.__name__, self.domain, list(self))
def matches(self, uri):
if self.domain is not None:
if self.domain != uri.partition('@')[2]:
return False
for child in self:
if not child.matches(uri):
return False
return True
class Identity(XMLListElement):
_xml_tag = 'identity'
_xml_namespace = namespace
_xml_document = CommonPolicyDocument
_xml_item_type = (IdentityOne, IdentityMany)
def __init__(self, identities=[]):
XMLListElement.__init__(self)
self.update(identities)
def matches(self, uri):
for child in self:
if child.matches(uri):
return True
return False
class Sphere(XMLElement):
_xml_tag = 'sphere'
_xml_namespace = namespace
_xml_document = CommonPolicyDocument
value = XMLAttribute('value', type=str, required=True, test_equal=True)
def __init__(self, value):
XMLElement.__init__(self)
self.value = value
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self.value)
class ValidFrom(XMLDateTimeElement):
_xml_tag = 'from'
_xml_namespace = namespace
_xml_document = CommonPolicyDocument
class ValidUntil(XMLDateTimeElement):
_xml_tag = 'until'
_xml_namespace = namespace
_xml_document = CommonPolicyDocument
class ValidityInterval(object):
def __init__(self, from_timestamp, until_timestamp):
self.valid_from = ValidFrom(from_timestamp)
self.valid_until = ValidUntil(until_timestamp)
def __eq__(self, other):
if isinstance(other, ValidityInterval):
return self is other or (self.valid_from == other.valid_from and self.valid_until == other.valid_until)
return NotImplemented
def __ne__(self, other):
if isinstance(other, ValidityInterval):
return self is not other and (self.valid_from != other.valid_from or self.valid_until != other.valid_until)
return NotImplemented
@classmethod
def from_elements(cls, from_element, until_element, xml_document=None):
instance = object.__new__(cls)
instance.valid_from = ValidFrom.from_element(from_element, xml_document)
instance.valid_until = ValidUntil.from_element(until_element, xml_document)
return instance
class Validity(XMLListElement):
_xml_tag = 'validity'
_xml_namespace = namespace
_xml_document = CommonPolicyDocument
_xml_item_type = ValidityInterval
def __init__(self, children=[]):
XMLListElement.__init__(self)
self.update(children)
def _parse_element(self, element):
iterator = iter(element)
for first_child in iterator:
second_child = iterator.next()
if first_child.tag == '{%s}from' % self._xml_namespace and second_child.tag == '{%s}until' % self._xml_namespace:
try:
item = ValidityInterval.from_elements(first_child, second_child, xml_document=self._xml_document)
except:
pass
else:
self._element_map[item.valid_from.element] = item
def _build_element(self):
for child in self:
child.valid_from.to_element()
child.valid_until.to_element()
def add(self, item):
if not isinstance(item, ValidityInterval):
raise TypeError("Validity element must be a ValidityInterval instance")
self._insert_element(item.valid_from.element)
self._insert_element(item.valid_until.element)
self._element_map[item.valid_from.element] = item
self.__dirty__ = True
def remove(self, item):
self.element.remove(item.valid_from.element)
self.element.remove(item.valid_until.element)
del self._element_map[item.valid_from.element]
self.__dirty__ = True
def check_validity(self):
if not self:
raise ValidationError("cannot have Validity element without any children")
super(Validity, self).check_validity(self)
class Conditions(XMLListElement):
_xml_tag = 'conditions'
_xml_namespace = namespace
_xml_document = CommonPolicyDocument
_xml_children_order = {Identity.qname: 0,
Sphere.qname: 1,
Validity.qname: 2}
_xml_item_type = (Identity, Sphere, Validity, ConditionElement)
def __init__(self, conditions=[]):
XMLListElement.__init__(self)
self.update(conditions)
class Actions(XMLListElement):
_xml_tag = 'actions'
_xml_namespace = namespace
_xml_document = CommonPolicyDocument
_xml_item_type = ActionElement
def __init__(self, actions=[]):
XMLListElement.__init__(self)
self.update(actions)
class Transformations(XMLListElement):
_xml_tag = 'transformations'
_xml_namespace = namespace
_xml_document = CommonPolicyDocument
_xml_item_type = TransformationElement
def __init__(self, transformations=[]):
XMLListElement.__init__(self)
self.update(transformations)
class Rule(XMLElement):
_xml_tag = 'rule'
_xml_namespace = namespace
_xml_extension_type = RuleExtension
_xml_document = CommonPolicyDocument
_xml_children_order = {Conditions.qname: 0,
Actions.qname: 1,
Transformations.qname: 2}
- id = XMLElementID('id', type=unicode, required=True, test_equal=True)
+ id = XMLElementID('id', type=ID, required=True, test_equal=True)
conditions = XMLElementChild('conditions', type=Conditions, required=False, test_equal=True)
actions = XMLElementChild('actions', type=Actions, required=False, test_equal=True)
transformations = XMLElementChild('transformations', type=Transformations, required=False, test_equal=True)
def __init__(self, id, conditions=None, actions=None, transformations=None):
XMLElement.__init__(self)
self.id = id
self.conditions = conditions
self.actions = actions
self.transformations = transformations
def __repr__(self):
return '%s(%r, %r, %r, %r)' % (self.__class__.__name__, self.id, self.conditions, self.actions, self.transformations)
class RuleSet(XMLListRootElement):
_xml_tag = 'ruleset'
_xml_namespace = namespace
_xml_document = CommonPolicyDocument
_xml_item_type = Rule
def __init__(self, rules=[]):
XMLListRootElement.__init__(self)
self.update(rules)
def __getitem__(self, key):
return self._xmlid_map[Rule][key]
def __delitem__(self, key):
self.remove(self._xmlid_map[Rule][key])
#
# Extensions
#
agp_cp_namespace = 'urn:ag-projects:xml:ns:common-policy'
CommonPolicyDocument.register_namespace(agp_cp_namespace, prefix='agp-cp')
# A condition element in the AG Projects namespace, it will always be evaluated to false
# because it's not understood by servers
class FalseCondition(XMLElement, ConditionElement):
_xml_tag = 'false-condition'
_xml_namespace = agp_cp_namespace
_xml_document = CommonPolicyDocument
class RuleDisplayName(XMLLocalizedStringElement, RuleExtension):
_xml_tag = 'display-name'
_xml_namespace = agp_cp_namespace
_xml_document = CommonPolicyDocument
Rule.register_extension('display_name', RuleDisplayName)
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sat, Feb 1, 10:43 AM (22 h, 55 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3489220
Default Alt Text
(43 KB)
Attached To
Mode
rPYNSIPSIMPLE python3-sipsimple
Attached
Detach File
Event Timeline
Log In to Comment