diff --git a/xcap/authentication/auth.py b/xcap/authentication/auth.py index ab17c5b..b4260d8 100644 --- a/xcap/authentication/auth.py +++ b/xcap/authentication/auth.py @@ -1,241 +1,241 @@ import base64 import hashlib import socket import struct import time from typing import Dict, Optional from uuid import uuid4 from fastapi import HTTPException, Request from xcap import __version__ from xcap.appusage import ServerConfig as Backend from xcap.appusage import namespaces, public_get_applications from xcap.configuration import AuthenticationConfig, ServerConfig from xcap.errors import ResourceNotFound from xcap.http_utils import get_client_ip from xcap.uri import XCAPUri from xcap.xpath import DocumentSelectorError, NodeParsingError # In-memory nonce cache with expiration time (for demonstration purposes) nonce_cache: Dict[int, str] = {} NONCE_EXPIRATION_TIME = 900 # 15 minutes for nonce expiration WELCOME = ('Not Found' '

Not Found

XCAP server does not serve anything ' 'directly under XCAP Root URL. You have to be more specific.' '

' '
OpenXCAP/%s
' '') % __version__ def parseNodeURI(node_uri: str, default_realm: str) -> XCAPUri: """Parses the given Node URI, containing the XCAP root, document selector, and node selector, and returns an XCAPUri instance if succesful.""" xcap_root = None for uri in ServerConfig.root.uris: if node_uri.startswith(uri): xcap_root = uri break if xcap_root is None: raise ResourceNotFound("XCAP root not found for URI: %s" % node_uri) resource_selector = node_uri[len(xcap_root):] if not resource_selector or resource_selector == '/': raise ResourceNotFound(WELCOME, "text/html") try: r = XCAPUri(xcap_root, resource_selector, namespaces) except NodeParsingError as e: raise HTTPException(status_code=e.status_code, detail=e.args) except DocumentSelectorError as e: raise HTTPException(status_code=e.status_code, detail=e.args) if r.user.domain is None: r.user.domain = default_realm return r class Credentials(object): def __init__(self, username, password=None, realm=None): self.username = username self.password = password self.realm = realm @property def hash(self): return hashlib.md5('{0.username}:{0.realm}:{0.password}'.format(self).encode()).hexdigest() def checkPassword(self, password: bytes) -> bool: return self.password == password def checkHash(self, digestHash): return digestHash == self.hash def is_valid(self, user): if AuthenticationConfig.cleartext_passwords: return self.checkPassword(user.password) else: return self.checkHash(user.ha1) class AuthenticationManager: def __init__(self): self.nonce_cache = nonce_cache self.trusted_peers = AuthenticationConfig.trusted_peers # Helper function to generate a nonce def generate_nonce(self) -> str: """Generate a new nonce (typically a random string with a timestamp).""" timestamp = int(time.time()) unique_nonce = f"{timestamp}-{uuid4()}" return base64.b64encode(unique_nonce.encode()).decode("utf-8") # Helper function to create the Digest response hash async def create_digest_response(self, username: str, nonce: str, uri: str, method: str, realm: str, cnonce: str, nc: str, qop: str) -> str: credentials = Credentials(username.split('@', 1)[0], realm=realm) user = await Backend.backend.PasswordChecker().query_user(credentials) if user is None: raise HTTPException(status_code=401, detail="Invalid credentials") ha1 = user[0].ha1 if AuthenticationConfig.cleartext_passwords: credentials.password = user[0].password ha1 = credentials.hash # Compute the ha2 hash (method:uri) ha2 = hashlib.md5(f"{method}:{uri}".encode()).hexdigest() # Compute the final response using the formula: MD5(HA1:nonce:nc:cnonce:qop:HA2) response = hashlib.md5(f"{ha1}:{nonce}:{nc}:{cnonce}:{qop}:{ha2}".encode()).hexdigest() return response # Function to validate and clean up expired nonces def validate_nonce(self, nonce: str) -> bool: """Check if the nonce is valid and not expired.""" if nonce not in self.nonce_cache: return False timestamp, _ = self.nonce_cache[nonce] current_time = int(time.time()) if current_time - timestamp > NONCE_EXPIRATION_TIME: # Nonce expired, remove it from the cache del self.nonce_cache[nonce] return False return True # Digest Authentication Dependency async def digest_auth(self, request: Request, realm: str) -> None: auth_header = request.headers.get("Authorization") if not auth_header or not auth_header.startswith("Digest "): nonce = self.generate_nonce() opaque = "eee38d7sacbefv2a3450ciny7QMkPqMAFRtzCUYo5tdS" self.nonce_cache[nonce] = (int(time.time()), opaque) # Store nonce with timestamp www_authenticate_header = ( f'Digest realm="{realm}", nonce="{nonce}", opaque={opaque}, algorithm=MD5, qop=auth' ) raise HTTPException( status_code=401, detail="Digest authentication required", headers={"WWW-Authenticate": www_authenticate_header}, ) # Parse the Digest fields from the header try: auth_fields = {k: v.strip('"') for k, v in (field.split("=", 1) for field in auth_header[7:].split(", "))} except ValueError: raise HTTPException(status_code=401, detail="Invalid Digest authentication format") required_fields = ["username", "nonce", "response", "uri", "qop", "cnonce", "nc"] for field in required_fields: if field not in auth_fields: raise HTTPException(status_code=401, detail=f"Missing required Digest field: {field}") username = auth_fields["username"] nonce = auth_fields["nonce"] response = auth_fields["response"] uri = auth_fields['uri'] method = request.method cnonce = auth_fields['cnonce'] nc = auth_fields['nc'] qop = auth_fields['qop'] # Use the database session to check for user if not self.validate_nonce(nonce): raise HTTPException(status_code=401, detail="Invalid or expired nonce") # Validate the Digest response expected_response = await self.create_digest_response(username, nonce, uri, method, realm, cnonce, nc, qop) # expected_response = self.create_digest_response(username, nonce, uri, method, db, realm) if response != expected_response: raise HTTPException(status_code=401, detail="Invalid credentials or response") async def basic_auth(self, request: Request, realm: str) -> None: auth_header = request.headers.get("Authorization") if not auth_header or not auth_header.startswith("Basic "): raise HTTPException( status_code=401, detail="Basic authentication required", - headers={"WWW-Authenticate": f"Basic realm=\"{AuthenticationConfig.default_realm}\""} + headers={"WWW-Authenticate": f"Basic realm=\"{realm}\""} ) auth_value = auth_header[6:].strip() decoded_value = base64.b64decode(auth_value).decode("utf-8") username, password = decoded_value.split(":") credentials = Credentials(username, password, realm) user = await Backend.backend.PasswordChecker().query_user(credentials) if user is None or not credentials.is_valid(user[0]): raise HTTPException( status_code=401, detail="Invalid credentials" ) # Function to check if the client IP is in the trusted peers list def is_ip_trusted(self, client_ip: Optional[str]) -> bool: """Check if the client IP is in the trusted peers list.""" if not self.trusted_peers or client_ip is None: return False # Iterate through each network range in the trusted_parties list for range in self.trusted_peers: # Convert the IP address to a 32-bit integer ip_int = struct.unpack('!L', socket.inet_aton(client_ip))[0] # Perform the bitwise comparison (IP address & network mask == base address) if ip_int & range[1] == range[0]: return True # If the IP address does not match any range, return False return False async def authenticate_request(self, request: Request) -> XCAPUri: """Authenticate a request by checking IP and applying Digest or Basic authentication as needed.""" client_ip = get_client_ip(request) xcap_uri = parseNodeURI(str(request.url), AuthenticationConfig.default_realm) if xcap_uri.doc_selector.context == 'global': return xcap_uri realm = xcap_uri.user.domain if realm is None: raise ResourceNotFound('Unknown domain (the domain part of "username@domain" is required because this server has no default domain)') if request.method == "GET" and xcap_uri.application_id in public_get_applications: return xcap_uri if self.is_ip_trusted(client_ip): return xcap_uri if AuthenticationConfig.type == 'digest': await self.digest_auth(request, realm) elif AuthenticationConfig.type == 'basic': await self.basic_auth(request, realm) else: raise ValueError('Invalid authentication type: %r. Please check the configuration.' % AuthenticationConfig.type) return xcap_uri