diff --git a/xcap/authentication.py b/xcap/authentication.py
deleted file mode 100644
index f0c154f..0000000
--- a/xcap/authentication.py
+++ /dev/null
@@ -1,366 +0,0 @@
-
-"""XCAP authentication module"""
-
-# XXX this module should be either renamed or refactored as it does more then just auth.
-from hashlib import md5
-from zope.interface import Interface, implements
-
-from twisted.internet import defer
-from twisted.python import failure
-from twisted.cred import credentials, portal, checkers, error as credError
-
-from application.configuration.datatypes import NetworkRangeList
-from application.configuration import ConfigSection, ConfigSetting
-
-import struct
-import socket
-import urllib.parse
-
-import xcap
-from xcap.datatypes import XCAPRootURI
-from xcap.appusage import getApplicationForURI, namespaces, public_get_applications
-from xcap.errors import ResourceNotFound
-from xcap.uri import XCAPUser, XCAPUri
-from xcap.web import http, server, stream, responsecode, http_headers
-from xcap.web.auth import basic, digest
-from xcap.web.auth.wrapper import HTTPAuthResource, UnauthorizedResponse
-
-
-# body of 404 error message to render when user requests xcap-root
-# it's html, because XCAP root is often published on the web.
-# NOTE: there're no plans to convert other error messages to html.
-# Since a web-browser is not the primary tool for accessing XCAP server, text/plain
-# is easier for clients to present to user/save to logs/etc.
-WELCOME = ('
Not Found'
- 'Not Found
XCAP server does not serve anything '
- 'directly under XCAP Root URL. You have to be more specific.'
- '
'
- 'OpenXCAP/%s'
- '') % xcap.__version__
-
-
-class AuthenticationConfig(ConfigSection):
- __cfgfile__ = xcap.__cfgfile__
- __section__ = 'Authentication'
-
- default_realm = ConfigSetting(type=str, value=None)
- trusted_peers = ConfigSetting(type=NetworkRangeList, value=NetworkRangeList('none'))
-
-class ServerConfig(ConfigSection):
- __cfgfile__ = xcap.__cfgfile__
- __section__ = 'Server'
-
- root = ConfigSetting(type=XCAPRootURI, value=None)
-
-
-def generateWWWAuthenticate(headers):
- _generated = []
- for seq in headers:
- scheme, challenge = seq[0], seq[1]
-
- # If we're going to parse out to something other than a dict
- # we need to be able to generate from something other than a dict
-
- try:
- l = []
- for k,v in dict(challenge).items():
- l.append("%s=%s" % (k, k in ("algorithm", "stale") and v or http_headers.quoteString(v)))
-
- _generated.append("%s %s" % (scheme, ", ".join(l)))
- except ValueError:
- _generated.append("%s %s" % (scheme, challenge))
-
- return _generated
-
-http_headers.generator_response_headers["WWW-Authenticate"] = (generateWWWAuthenticate,)
-http_headers.DefaultHTTPHandler.updateGenerators(http_headers.generator_response_headers)
-del generateWWWAuthenticate
-
-def parseNodeURI(node_uri, default_realm):
- """Parses the given Node URI, containing the XCAP root, document selector,
- and node selector, and returns an XCAPUri instance if succesful."""
- xcap_root = None
- for uri in ServerConfig.root.uris:
- if node_uri.startswith(uri):
- xcap_root = uri
- break
- if xcap_root is None:
- raise ResourceNotFound("XCAP root not found for URI: %s" % node_uri)
- resource_selector = node_uri[len(xcap_root):]
- if not resource_selector or resource_selector=='/':
- raise ResourceNotFound(WELCOME, http_headers.MimeType("text", "html"))
- r = XCAPUri(xcap_root, resource_selector, namespaces)
- if r.user.domain is None:
- r.user.domain = default_realm
- return r
-
-
-class ITrustedPeerCredentials(credentials.ICredentials):
- def checkPeer(self, trusted_peers):
- pass
-
-class TrustedPeerCredentials(object):
- implements(ITrustedPeerCredentials)
-
- def __init__(self, peer):
- self.peer = peer
-
- def checkPeer(self, trusted_peers):
- for range in trusted_peers:
- if struct.unpack('!L', socket.inet_aton(self.peer))[0] & range[1] == range[0]:
- return True
- return False
-
-class IPublicGetApplicationCredentials(credentials.ICredentials):
- def checkApplication(self):
- pass
-
-class PublicGetApplicationCredentials(object):
- implements(IPublicGetApplicationCredentials)
-
- def checkApplication(self):
- return True
-
-## credentials checkers
-
-class TrustedPeerChecker(object):
- implements(checkers.ICredentialsChecker)
- credentialInterfaces = (ITrustedPeerCredentials,)
-
- def __init__(self, trusted_peers):
- self.trusted_peers = trusted_peers
-
- def requestAvatarId(self, credentials):
- """Return the avatar ID for the credentials which must have a 'peer' attribute,
- or an UnauthorizedLogin in case of a failure."""
- if credentials.checkPeer(self.trusted_peers):
- return defer.succeed(credentials.peer)
- return defer.fail(credError.UnauthorizedLogin())
-
-class PublicGetApplicationChecker(object):
- implements(checkers.ICredentialsChecker)
- credentialInterfaces = (IPublicGetApplicationCredentials,)
-
- def requestAvatarId(self, credentials):
- """We already know that the method is GET and the application is a 'public GET application',
- we just need to say that the authentication succeeded."""
- if credentials.checkApplication():
- return defer.succeed(None)
- return defer.fail(credError.UnauthorizedLogin())
-
-## avatars
-
-class IAuthUser(Interface):
- pass
-
-class ITrustedPeer(Interface):
- pass
-
-class IPublicGetApplication(Interface):
- pass
-
-class AuthUser(str):
- """Authenticated XCAP User avatar."""
- implements(IAuthUser)
-
-class TrustedPeer(str):
- """Trusted peer avatar."""
- implements(ITrustedPeer)
-
-class PublicGetApplication(str):
- """Public get application avatar."""
- implements(IPublicGetApplication)
-
-## realm
-
-class XCAPAuthRealm(object):
- """XCAP authentication realm. Receives an avatar ID (a string identifying the user)
- and a list of interfaces the avatar needs to support. It returns an avatar that
- encapsulates data about that user."""
- implements(portal.IRealm)
-
- def requestAvatar(self, avatarId, mind, *interfaces):
- if IAuthUser in interfaces:
- return IAuthUser, AuthUser(avatarId)
- elif ITrustedPeer in interfaces:
- return ITrustedPeer, TrustedPeer(avatarId)
- elif IPublicGetApplication in interfaces:
- return IPublicGetApplication, PublicGetApplication(avatarId)
-
- raise NotImplementedError("Only IAuthUser and ITrustedPeer interfaces are supported")
-
-def get_cred(request, default_realm):
- auth = request.headers.getHeader('authorization')
- if auth:
- typ, data = auth
- if typ == 'basic':
- return data.decode('base64').split(':', 1)[0], default_realm
- elif typ == 'digest':
- raise NotImplementedError
- return None, default_realm
-
-## authentication wrapper for XCAP resources
-class XCAPAuthResource(HTTPAuthResource):
-
- def allowedMethods(self):
- return 'GET', 'PUT', 'DELETE'
-
- def _updateRealm(self, realm):
- """Updates the realm of the attached credential factories."""
- for factory in list(self.credentialFactories.values()):
- factory.realm = realm
-
- def authenticate(self, request):
- """Authenticates an XCAP request."""
- parsed_url = urllib.parse.urlparse(request.uri)
- if request.port in (80, 443):
- uri = request.scheme + "://" + request.host + parsed_url.path
- else:
- uri = request.scheme + "://" + request.host + ":" + str(request.port) + parsed_url.path
- if parsed_url.query:
- uri += "?%s" % parsed_url.query
- xcap_uri = parseNodeURI(uri, AuthenticationConfig.default_realm)
- request.xcap_uri = xcap_uri
- if xcap_uri.doc_selector.context=='global':
- return defer.succeed(self.wrappedResource)
-
- ## For each request the authentication realm must be
- ## dinamically deducted from the XCAP request URI
- realm = xcap_uri.user.domain
-
- if realm is None:
- raise ResourceNotFound('Unknown domain (the domain part of "username@domain" is required because this server has no default domain)')
-
- if not xcap_uri.user.username:
- # for 'global' requests there's no username@domain in the URI,
- # so we will use username and domain from Authorization header
- xcap_uri.user.username, xcap_uri.user.domain = get_cred(request, AuthenticationConfig.default_realm)
-
- self._updateRealm(realm)
-
- # If we receive a GET to a 'public GET application' we will not authenticate it
- if request.method == "GET" and xcap_uri.application_id in public_get_applications:
- return self.portal.login(PublicGetApplicationCredentials(),
- None,
- IPublicGetApplication
- ).addCallbacks(self._loginSucceeded,
- self._publicGetApplicationLoginFailed,
- (request,), None,
- (request,), None)
-
- remote_addr = request.remoteAddr.host
- if AuthenticationConfig.trusted_peers:
- return self.portal.login(TrustedPeerCredentials(remote_addr),
- None,
- ITrustedPeer
- ).addCallbacks(self._loginSucceeded,
- self._trustedPeerLoginFailed,
- (request,), None,
- (request,), None)
-
- return HTTPAuthResource.authenticate(self, request)
-
- def _trustedPeerLoginFailed(self, result, request):
- """If the peer is not trusted, fallback to HTTP basic/digest authentication."""
- return HTTPAuthResource.authenticate(self, request)
-
- def _publicGetApplicationLoginFailed(self, result, request):
- return HTTPAuthResource.authenticate(self, request)
-
- def _loginSucceeded(self, avatar, request):
- """Authorizes an XCAP request after it has been authenticated."""
-
- interface, avatar_id = avatar ## the avatar is the authenticated XCAP User
- xcap_uri = request.xcap_uri
-
- application = getApplicationForURI(xcap_uri)
-
- if not application:
- raise ResourceNotFound
-
- if interface is IAuthUser and application.is_authorized(XCAPUser.parse(avatar_id), xcap_uri):
- return HTTPAuthResource._loginSucceeded(self, avatar, request)
- elif interface is ITrustedPeer or interface is IPublicGetApplication:
- return HTTPAuthResource._loginSucceeded(self, avatar, request)
- else:
- return failure.Failure(
- http.HTTPError(
- UnauthorizedResponse(
- self.credentialFactories,
- request.remoteAddr)))
-
- def locateChild(self, request, seg):
- """
- Authenticate the request then return the C{self.wrappedResource}
- and the unmodified segments.
- We're not using path location, we want to fall back to the renderHTTP() call.
- """
- #return self.authenticate(request), seg
- return self, server.StopTraversal
-
- def renderHTTP(self, request):
- """
- Authenticate the request then return the result of calling renderHTTP
- on C{self.wrappedResource}
- """
- if request.method not in self.allowedMethods():
- response = http.Response(responsecode.NOT_ALLOWED)
- response.headers.setHeader("allow", self.allowedMethods())
- return response
-
- def _renderResource(resource):
- return resource.renderHTTP(request)
-
- def _finished_reading(ignore, result):
- data = ''.join(result)
- request.attachment = data
- d = self.authenticate(request)
- d.addCallback(_renderResource)
- return d
-
- if request.method in ('PUT', 'DELETE'):
- # we need to authenticate the request after all the attachment stream
- # has been read
- # QQQ DELETE doesn't have any attachments, does it? nor does GET.
- # QQQ Reading attachment when there isn't one won't hurt, will it?
- # QQQ So why don't we just do it all the time for all requests?
- data = []
- d = stream.readStream(request.stream, data.append)
- d.addCallback(_finished_reading, data)
- return d
- else:
- d = self.authenticate(request)
- d.addCallback(_renderResource)
-
- return d
-
-
-class BasicCredentials(credentials.UsernamePassword):
- """Custom Basic Credentials, which support both plain and hashed checks."""
-
- implements(credentials.IUsernamePassword, digest.IUsernameDigestHash)
-
- def __init__(self, username, password, realm):
- credentials.UsernamePassword.__init__(self, username, password)
- self.realm = realm
-
- @property
- def hash(self):
- return md5('{0.username}:{0.realm}:{0.password}'.format(self)).hexdigest()
-
- def checkHash(self, digestHash):
- return digestHash == self.hash
-
-
-class BasicCredentialFactory(basic.BasicCredentialFactory):
- def decode(self, response, request):
- credential = super(BasicCredentialFactory, self).decode(response, request)
- return BasicCredentials(credential.username, credential.password, self.realm)
-
-
-class DigestCredentialFactory(digest.DigestCredentialFactory):
- def generateOpaque(self, nonce, clientip):
- return super(DigestCredentialFactory, self).generateOpaque(nonce=nonce, clientip=clientip or '')
-
- def verifyOpaque(self, opaque, nonce, clientip):
- return super(DigestCredentialFactory, self).verifyOpaque(opaque=opaque, nonce=nonce, clientip=clientip or '')