diff --git a/xcap/authentication.py b/xcap/authentication.py deleted file mode 100644 index f0c154f..0000000 --- a/xcap/authentication.py +++ /dev/null @@ -1,366 +0,0 @@ - -"""XCAP authentication module""" - -# XXX this module should be either renamed or refactored as it does more then just auth. -from hashlib import md5 -from zope.interface import Interface, implements - -from twisted.internet import defer -from twisted.python import failure -from twisted.cred import credentials, portal, checkers, error as credError - -from application.configuration.datatypes import NetworkRangeList -from application.configuration import ConfigSection, ConfigSetting - -import struct -import socket -import urllib.parse - -import xcap -from xcap.datatypes import XCAPRootURI -from xcap.appusage import getApplicationForURI, namespaces, public_get_applications -from xcap.errors import ResourceNotFound -from xcap.uri import XCAPUser, XCAPUri -from xcap.web import http, server, stream, responsecode, http_headers -from xcap.web.auth import basic, digest -from xcap.web.auth.wrapper import HTTPAuthResource, UnauthorizedResponse - - -# body of 404 error message to render when user requests xcap-root -# it's html, because XCAP root is often published on the web. -# NOTE: there're no plans to convert other error messages to html. -# Since a web-browser is not the primary tool for accessing XCAP server, text/plain -# is easier for clients to present to user/save to logs/etc. -WELCOME = ('Not Found' - '

Not Found

XCAP server does not serve anything ' - 'directly under XCAP Root URL. You have to be more specific.' - '

' - '
OpenXCAP/%s
' - '') % xcap.__version__ - - -class AuthenticationConfig(ConfigSection): - __cfgfile__ = xcap.__cfgfile__ - __section__ = 'Authentication' - - default_realm = ConfigSetting(type=str, value=None) - trusted_peers = ConfigSetting(type=NetworkRangeList, value=NetworkRangeList('none')) - -class ServerConfig(ConfigSection): - __cfgfile__ = xcap.__cfgfile__ - __section__ = 'Server' - - root = ConfigSetting(type=XCAPRootURI, value=None) - - -def generateWWWAuthenticate(headers): - _generated = [] - for seq in headers: - scheme, challenge = seq[0], seq[1] - - # If we're going to parse out to something other than a dict - # we need to be able to generate from something other than a dict - - try: - l = [] - for k,v in dict(challenge).items(): - l.append("%s=%s" % (k, k in ("algorithm", "stale") and v or http_headers.quoteString(v))) - - _generated.append("%s %s" % (scheme, ", ".join(l))) - except ValueError: - _generated.append("%s %s" % (scheme, challenge)) - - return _generated - -http_headers.generator_response_headers["WWW-Authenticate"] = (generateWWWAuthenticate,) -http_headers.DefaultHTTPHandler.updateGenerators(http_headers.generator_response_headers) -del generateWWWAuthenticate - -def parseNodeURI(node_uri, default_realm): - """Parses the given Node URI, containing the XCAP root, document selector, - and node selector, and returns an XCAPUri instance if succesful.""" - xcap_root = None - for uri in ServerConfig.root.uris: - if node_uri.startswith(uri): - xcap_root = uri - break - if xcap_root is None: - raise ResourceNotFound("XCAP root not found for URI: %s" % node_uri) - resource_selector = node_uri[len(xcap_root):] - if not resource_selector or resource_selector=='/': - raise ResourceNotFound(WELCOME, http_headers.MimeType("text", "html")) - r = XCAPUri(xcap_root, resource_selector, namespaces) - if r.user.domain is None: - r.user.domain = default_realm - return r - - -class ITrustedPeerCredentials(credentials.ICredentials): - def checkPeer(self, trusted_peers): - pass - -class TrustedPeerCredentials(object): - implements(ITrustedPeerCredentials) - - def __init__(self, peer): - self.peer = peer - - def checkPeer(self, trusted_peers): - for range in trusted_peers: - if struct.unpack('!L', socket.inet_aton(self.peer))[0] & range[1] == range[0]: - return True - return False - -class IPublicGetApplicationCredentials(credentials.ICredentials): - def checkApplication(self): - pass - -class PublicGetApplicationCredentials(object): - implements(IPublicGetApplicationCredentials) - - def checkApplication(self): - return True - -## credentials checkers - -class TrustedPeerChecker(object): - implements(checkers.ICredentialsChecker) - credentialInterfaces = (ITrustedPeerCredentials,) - - def __init__(self, trusted_peers): - self.trusted_peers = trusted_peers - - def requestAvatarId(self, credentials): - """Return the avatar ID for the credentials which must have a 'peer' attribute, - or an UnauthorizedLogin in case of a failure.""" - if credentials.checkPeer(self.trusted_peers): - return defer.succeed(credentials.peer) - return defer.fail(credError.UnauthorizedLogin()) - -class PublicGetApplicationChecker(object): - implements(checkers.ICredentialsChecker) - credentialInterfaces = (IPublicGetApplicationCredentials,) - - def requestAvatarId(self, credentials): - """We already know that the method is GET and the application is a 'public GET application', - we just need to say that the authentication succeeded.""" - if credentials.checkApplication(): - return defer.succeed(None) - return defer.fail(credError.UnauthorizedLogin()) - -## avatars - -class IAuthUser(Interface): - pass - -class ITrustedPeer(Interface): - pass - -class IPublicGetApplication(Interface): - pass - -class AuthUser(str): - """Authenticated XCAP User avatar.""" - implements(IAuthUser) - -class TrustedPeer(str): - """Trusted peer avatar.""" - implements(ITrustedPeer) - -class PublicGetApplication(str): - """Public get application avatar.""" - implements(IPublicGetApplication) - -## realm - -class XCAPAuthRealm(object): - """XCAP authentication realm. Receives an avatar ID (a string identifying the user) - and a list of interfaces the avatar needs to support. It returns an avatar that - encapsulates data about that user.""" - implements(portal.IRealm) - - def requestAvatar(self, avatarId, mind, *interfaces): - if IAuthUser in interfaces: - return IAuthUser, AuthUser(avatarId) - elif ITrustedPeer in interfaces: - return ITrustedPeer, TrustedPeer(avatarId) - elif IPublicGetApplication in interfaces: - return IPublicGetApplication, PublicGetApplication(avatarId) - - raise NotImplementedError("Only IAuthUser and ITrustedPeer interfaces are supported") - -def get_cred(request, default_realm): - auth = request.headers.getHeader('authorization') - if auth: - typ, data = auth - if typ == 'basic': - return data.decode('base64').split(':', 1)[0], default_realm - elif typ == 'digest': - raise NotImplementedError - return None, default_realm - -## authentication wrapper for XCAP resources -class XCAPAuthResource(HTTPAuthResource): - - def allowedMethods(self): - return 'GET', 'PUT', 'DELETE' - - def _updateRealm(self, realm): - """Updates the realm of the attached credential factories.""" - for factory in list(self.credentialFactories.values()): - factory.realm = realm - - def authenticate(self, request): - """Authenticates an XCAP request.""" - parsed_url = urllib.parse.urlparse(request.uri) - if request.port in (80, 443): - uri = request.scheme + "://" + request.host + parsed_url.path - else: - uri = request.scheme + "://" + request.host + ":" + str(request.port) + parsed_url.path - if parsed_url.query: - uri += "?%s" % parsed_url.query - xcap_uri = parseNodeURI(uri, AuthenticationConfig.default_realm) - request.xcap_uri = xcap_uri - if xcap_uri.doc_selector.context=='global': - return defer.succeed(self.wrappedResource) - - ## For each request the authentication realm must be - ## dinamically deducted from the XCAP request URI - realm = xcap_uri.user.domain - - if realm is None: - raise ResourceNotFound('Unknown domain (the domain part of "username@domain" is required because this server has no default domain)') - - if not xcap_uri.user.username: - # for 'global' requests there's no username@domain in the URI, - # so we will use username and domain from Authorization header - xcap_uri.user.username, xcap_uri.user.domain = get_cred(request, AuthenticationConfig.default_realm) - - self._updateRealm(realm) - - # If we receive a GET to a 'public GET application' we will not authenticate it - if request.method == "GET" and xcap_uri.application_id in public_get_applications: - return self.portal.login(PublicGetApplicationCredentials(), - None, - IPublicGetApplication - ).addCallbacks(self._loginSucceeded, - self._publicGetApplicationLoginFailed, - (request,), None, - (request,), None) - - remote_addr = request.remoteAddr.host - if AuthenticationConfig.trusted_peers: - return self.portal.login(TrustedPeerCredentials(remote_addr), - None, - ITrustedPeer - ).addCallbacks(self._loginSucceeded, - self._trustedPeerLoginFailed, - (request,), None, - (request,), None) - - return HTTPAuthResource.authenticate(self, request) - - def _trustedPeerLoginFailed(self, result, request): - """If the peer is not trusted, fallback to HTTP basic/digest authentication.""" - return HTTPAuthResource.authenticate(self, request) - - def _publicGetApplicationLoginFailed(self, result, request): - return HTTPAuthResource.authenticate(self, request) - - def _loginSucceeded(self, avatar, request): - """Authorizes an XCAP request after it has been authenticated.""" - - interface, avatar_id = avatar ## the avatar is the authenticated XCAP User - xcap_uri = request.xcap_uri - - application = getApplicationForURI(xcap_uri) - - if not application: - raise ResourceNotFound - - if interface is IAuthUser and application.is_authorized(XCAPUser.parse(avatar_id), xcap_uri): - return HTTPAuthResource._loginSucceeded(self, avatar, request) - elif interface is ITrustedPeer or interface is IPublicGetApplication: - return HTTPAuthResource._loginSucceeded(self, avatar, request) - else: - return failure.Failure( - http.HTTPError( - UnauthorizedResponse( - self.credentialFactories, - request.remoteAddr))) - - def locateChild(self, request, seg): - """ - Authenticate the request then return the C{self.wrappedResource} - and the unmodified segments. - We're not using path location, we want to fall back to the renderHTTP() call. - """ - #return self.authenticate(request), seg - return self, server.StopTraversal - - def renderHTTP(self, request): - """ - Authenticate the request then return the result of calling renderHTTP - on C{self.wrappedResource} - """ - if request.method not in self.allowedMethods(): - response = http.Response(responsecode.NOT_ALLOWED) - response.headers.setHeader("allow", self.allowedMethods()) - return response - - def _renderResource(resource): - return resource.renderHTTP(request) - - def _finished_reading(ignore, result): - data = ''.join(result) - request.attachment = data - d = self.authenticate(request) - d.addCallback(_renderResource) - return d - - if request.method in ('PUT', 'DELETE'): - # we need to authenticate the request after all the attachment stream - # has been read - # QQQ DELETE doesn't have any attachments, does it? nor does GET. - # QQQ Reading attachment when there isn't one won't hurt, will it? - # QQQ So why don't we just do it all the time for all requests? - data = [] - d = stream.readStream(request.stream, data.append) - d.addCallback(_finished_reading, data) - return d - else: - d = self.authenticate(request) - d.addCallback(_renderResource) - - return d - - -class BasicCredentials(credentials.UsernamePassword): - """Custom Basic Credentials, which support both plain and hashed checks.""" - - implements(credentials.IUsernamePassword, digest.IUsernameDigestHash) - - def __init__(self, username, password, realm): - credentials.UsernamePassword.__init__(self, username, password) - self.realm = realm - - @property - def hash(self): - return md5('{0.username}:{0.realm}:{0.password}'.format(self)).hexdigest() - - def checkHash(self, digestHash): - return digestHash == self.hash - - -class BasicCredentialFactory(basic.BasicCredentialFactory): - def decode(self, response, request): - credential = super(BasicCredentialFactory, self).decode(response, request) - return BasicCredentials(credential.username, credential.password, self.realm) - - -class DigestCredentialFactory(digest.DigestCredentialFactory): - def generateOpaque(self, nonce, clientip): - return super(DigestCredentialFactory, self).generateOpaque(nonce=nonce, clientip=clientip or '') - - def verifyOpaque(self, opaque, nonce, clientip): - return super(DigestCredentialFactory, self).verifyOpaque(opaque=opaque, nonce=nonce, clientip=clientip or '')