diff --git a/xcap/authentication/auth.py b/xcap/authentication/auth.py index d56fe22..ba5da8c 100644 --- a/xcap/authentication/auth.py +++ b/xcap/authentication/auth.py @@ -1,240 +1,240 @@ import base64 import hashlib import socket import struct import time -from typing import Optional +from typing import Dict, Optional from uuid import uuid4 from fastapi import HTTPException, Request from xcap import __version__ from xcap.appusage import ServerConfig as Backend from xcap.appusage import namespaces, public_get_applications from xcap.configuration import AuthenticationConfig, ServerConfig from xcap.errors import ResourceNotFound from xcap.http_utils import get_client_ip from xcap.uri import XCAPUri from xcap.xpath import DocumentSelectorError, NodeParsingError # In-memory nonce cache with expiration time (for demonstration purposes) -nonce_cache: dict[int, str] = {} +nonce_cache: Dict[int, str] = {} NONCE_EXPIRATION_TIME = 900 # 15 minutes for nonce expiration WELCOME = ('Not Found' '

Not Found

XCAP server does not serve anything ' 'directly under XCAP Root URL. You have to be more specific.' '

' '
OpenXCAP/%s
' '') % __version__ def parseNodeURI(node_uri: str, default_realm: str) -> XCAPUri: """Parses the given Node URI, containing the XCAP root, document selector, and node selector, and returns an XCAPUri instance if succesful.""" xcap_root = None for uri in ServerConfig.root.uris: if node_uri.startswith(uri): xcap_root = uri break if xcap_root is None: raise ResourceNotFound("XCAP root not found for URI: %s" % node_uri) resource_selector = node_uri[len(xcap_root):] if not resource_selector or resource_selector == '/': raise ResourceNotFound(WELCOME, "text/html") try: r = XCAPUri(xcap_root, resource_selector, namespaces) except NodeParsingError as e: raise HTTPException(status_code=e.status_code, detail=e.args) except DocumentSelectorError as e: raise HTTPException(status_code=e.status_code, detail=e.args) if r.user.domain is None: r.user.domain = default_realm return r class Credentials(object): def __init__(self, username, password=None, realm=None): self.username = username self.password = password self.realm = realm @property def hash(self): return hashlib.md5('{0.username}:{0.realm}:{0.password}'.format(self).encode()).hexdigest() def checkPassword(self, password: bytes) -> bool: return self.password == password def checkHash(self, digestHash): return digestHash == self.hash def is_valid(self, user): if AuthenticationConfig.cleartext_passwords: return self.checkPassword(user.password) else: return self.checkHash(user.ha1) class AuthenticationManager: def __init__(self): self.nonce_cache = nonce_cache self.trusted_peers = AuthenticationConfig.trusted_peers # Helper function to generate a nonce def generate_nonce(self) -> str: """Generate a new nonce (typically a random string with a timestamp).""" timestamp = int(time.time()) unique_nonce = f"{timestamp}-{uuid4()}" return base64.b64encode(unique_nonce.encode()).decode("utf-8") # Helper function to create the Digest response hash async def create_digest_response(self, username: str, nonce: str, uri: str, method: str, realm: str, cnonce: str, nc: str, qop: str) -> str: credentials = Credentials(username.split('@', 1)[0], realm=realm) user = await Backend.backend.PasswordChecker().query_user(credentials) if user is None: raise HTTPException(status_code=401, detail="Invalid credentials") ha1 = user[0].ha1 if AuthenticationConfig.cleartext_passwords: ha1 = credentials.hash # Compute the ha2 hash (method:uri) ha2 = hashlib.md5(f"{method}:{uri}".encode()).hexdigest() # Compute the final response using the formula: MD5(HA1:nonce:nc:cnonce:qop:HA2) response = hashlib.md5(f"{ha1}:{nonce}:{nc}:{cnonce}:{qop}:{ha2}".encode()).hexdigest() return response # Function to validate and clean up expired nonces def validate_nonce(self, nonce: str) -> bool: """Check if the nonce is valid and not expired.""" if nonce not in self.nonce_cache: return False timestamp, _ = self.nonce_cache[nonce] current_time = int(time.time()) if current_time - timestamp > NONCE_EXPIRATION_TIME: # Nonce expired, remove it from the cache del self.nonce_cache[nonce] return False return True # Digest Authentication Dependency async def digest_auth(self, request: Request, realm: str) -> None: auth_header = request.headers.get("Authorization") if not auth_header or not auth_header.startswith("Digest "): nonce = self.generate_nonce() opaque = "eee38d7sacbefv2a3450ciny7QMkPqMAFRtzCUYo5tdS" self.nonce_cache[nonce] = (int(time.time()), opaque) # Store nonce with timestamp www_authenticate_header = ( f'Digest realm="{realm}", nonce="{nonce}", opaque={opaque}, algorithm=MD5, qop=auth' ) raise HTTPException( status_code=401, detail="Digest authentication required", headers={"WWW-Authenticate": www_authenticate_header}, ) # Parse the Digest fields from the header try: auth_fields = {k: v.strip('"') for k, v in (field.split("=", 1) for field in auth_header[7:].split(", "))} except ValueError: raise HTTPException(status_code=401, detail="Invalid Digest authentication format") required_fields = ["username", "nonce", "response", "uri", "qop", "cnonce", "nc"] for field in required_fields: if field not in auth_fields: raise HTTPException(status_code=401, detail=f"Missing required Digest field: {field}") username = auth_fields["username"] nonce = auth_fields["nonce"] response = auth_fields["response"] uri = auth_fields['uri'] method = request.method cnonce = auth_fields['cnonce'] nc = auth_fields['nc'] qop = auth_fields['qop'] # Use the database session to check for user if not self.validate_nonce(nonce): raise HTTPException(status_code=401, detail="Invalid or expired nonce") # Validate the Digest response expected_response = await self.create_digest_response(username, nonce, uri, method, realm, cnonce, nc, qop) # expected_response = self.create_digest_response(username, nonce, uri, method, db, realm) if response != expected_response: raise HTTPException(status_code=401, detail="Invalid credentials or response") async def basic_auth(self, request: Request, realm: str) -> None: auth_header = request.headers.get("Authorization") if not auth_header or not auth_header.startswith("Basic "): raise HTTPException( status_code=401, detail="Basic authentication required", headers={"WWW-Authenticate": f"Basic realm=\"{AuthenticationConfig.default_realm}\""} ) auth_value = auth_header[6:].strip() decoded_value = base64.b64decode(auth_value).decode("utf-8") username, password = decoded_value.split(":") credentials = Credentials(username, password, realm) user = await Backend.backend.PasswordChecker().query_user(credentials) if user is None or not credentials.is_valid(user[0]): raise HTTPException( status_code=401, detail="Invalid credentials" ) # Function to check if the client IP is in the trusted peers list def is_ip_trusted(self, client_ip: Optional[str]) -> bool: """Check if the client IP is in the trusted peers list.""" if not self.trusted_peers or client_ip is None: return False # Iterate through each network range in the trusted_parties list for range in self.trusted_peers: # Convert the IP address to a 32-bit integer ip_int = struct.unpack('!L', socket.inet_aton(client_ip))[0] # Perform the bitwise comparison (IP address & network mask == base address) if ip_int & range[1] == range[0]: return True # If the IP address does not match any range, return False return False async def authenticate_request(self, request: Request) -> XCAPUri: """Authenticate a request by checking IP and applying Digest or Basic authentication as needed.""" client_ip = get_client_ip(request) xcap_uri = parseNodeURI(str(request.url), AuthenticationConfig.default_realm) if xcap_uri.doc_selector.context == 'global': return xcap_uri realm = xcap_uri.user.domain if realm is None: raise ResourceNotFound('Unknown domain (the domain part of "username@domain" is required because this server has no default domain)') if request.method == "GET" and xcap_uri.application_id in public_get_applications: return xcap_uri if self.is_ip_trusted(client_ip): return xcap_uri if AuthenticationConfig.type == 'digest': await self.digest_auth(request, realm) elif AuthenticationConfig.type == 'basic': await self.basic_auth(request, realm) else: raise ValueError('Invalid authentication type: %r. Please check the configuration.' % AuthenticationConfig.type) return xcap_uri diff --git a/xcap/uri.py b/xcap/uri.py index a1e2586..29914aa 100644 --- a/xcap/uri.py +++ b/xcap/uri.py @@ -1,113 +1,113 @@ """XCAP URI module http://tools.ietf.org/html/rfc4825#section-6 The algorithm to decode the URI is as following: * First, percent-decode the whole URI (urllib.unquote) * Split document selector from node selector (str.split('~~')) * Then use xpath_tokenizer from lxml to parse the whole node selector and extract individual steps Although after doing percent-decoding first, we cannot use s.split('/'), using lexer from lxml alleviates that fact a bit and produces good results. A potential problem can arise with URIs that contain [percent-encoded] double quotes. Here's an example: /resource-lists/list[@name="friends"]/external[@anchor="/list[@name=%22mkting%22]"] which would be converted to /resource-lists/list[@name="friends"]/external[@anchor="/list[@name="mkting"]"] and that would confuse the parser. I'm not sure if it's legal to have such URIs, but if it is this module has to be fixed. Meanwhile, the safe approach is to use " /resource-lists/list[@name="friends"]/external[@anchor="/list[@name="mkting"]"] """ -from typing import Any, Optional, Union +from typing import Any, Dict, Optional, Union from urllib.parse import unquote from xcap.configuration.datatypes import XCAPRootURI from xcap.xpath import DocumentSelector, NodeSelector class XCAPUser(object): def __init__(self, username: Optional[str] = None, domain: Optional[str] = None): self.username = username self.domain = domain @property def uri(self) -> str: return 'sip:%s@%s' % (self.username, self.domain) def __eq__(self, other) -> bool: return isinstance(other, XCAPUser) and self.uri == other.uri def __ne__(self, other) -> bool: return not self.__eq__(other) def __bool__(self) -> bool: return bool(self.username) and bool(self.domain) def __str__(self) -> str: return "%s@%s" % (self.username, self.domain) def __repr__(self) -> str: return 'XCAPUser(%r, %r)' % (self.username, self.domain) def __hash__(self) -> int: return hash(self.username) @classmethod def parse(cls, user_id: str, default_domain: Optional[str] = None) -> "XCAPUser": if user_id.startswith("sip:"): user_id = user_id[4:] _split = user_id.split('@', 1) username = _split[0] if len(_split) == 2: domain = _split[1] else: domain = default_domain if default_domain else '' return cls(username, domain) class XCAPUri(object): """An XCAP URI containing the XCAP root, document selector and node selector.""" - def __init__(self, xcap_root: XCAPRootURI, resource_selector: str, namespaces: dict[Any, Any]): + def __init__(self, xcap_root: XCAPRootURI, resource_selector: str, namespaces: Dict[Any, Any]): "namespaces maps application id to default namespace" self.xcap_root = xcap_root self.resource_selector = unquote(resource_selector) realm = None # convention to get the realm if it's not contained in the user ID section # of the document selector (bad eyebeam) if self.resource_selector.startswith("@"): first_slash = self.resource_selector.find("/") realm = self.resource_selector[1:first_slash] self.resource_selector = self.resource_selector[first_slash:] _split = self.resource_selector.split('~~', 1) doc_selector = _split[0] self.doc_selector = DocumentSelector(doc_selector) self.application_id = self.doc_selector.application_id self.node_selector: Union[NodeSelector, None] = None if len(_split) == 2: self.node_selector = NodeSelector(_split[1], namespaces.get(self.application_id)) if self.doc_selector.user_id: self.user = XCAPUser.parse(self.doc_selector.user_id, realm) else: self.user = XCAPUser(None, realm) def __str__(self) -> str: return self.xcap_root + self.resource_selector