diff --git a/xcap/authentication/auth.py b/xcap/authentication/auth.py
index d56fe22..ba5da8c 100644
--- a/xcap/authentication/auth.py
+++ b/xcap/authentication/auth.py
@@ -1,240 +1,240 @@
import base64
import hashlib
import socket
import struct
import time
-from typing import Optional
+from typing import Dict, Optional
from uuid import uuid4
from fastapi import HTTPException, Request
from xcap import __version__
from xcap.appusage import ServerConfig as Backend
from xcap.appusage import namespaces, public_get_applications
from xcap.configuration import AuthenticationConfig, ServerConfig
from xcap.errors import ResourceNotFound
from xcap.http_utils import get_client_ip
from xcap.uri import XCAPUri
from xcap.xpath import DocumentSelectorError, NodeParsingError
# In-memory nonce cache with expiration time (for demonstration purposes)
-nonce_cache: dict[int, str] = {}
+nonce_cache: Dict[int, str] = {}
NONCE_EXPIRATION_TIME = 900 # 15 minutes for nonce expiration
WELCOME = ('
Not Found'
'Not Found
XCAP server does not serve anything '
'directly under XCAP Root URL. You have to be more specific.'
'
'
'OpenXCAP/%s'
'') % __version__
def parseNodeURI(node_uri: str, default_realm: str) -> XCAPUri:
"""Parses the given Node URI, containing the XCAP root, document selector,
and node selector, and returns an XCAPUri instance if succesful."""
xcap_root = None
for uri in ServerConfig.root.uris:
if node_uri.startswith(uri):
xcap_root = uri
break
if xcap_root is None:
raise ResourceNotFound("XCAP root not found for URI: %s" % node_uri)
resource_selector = node_uri[len(xcap_root):]
if not resource_selector or resource_selector == '/':
raise ResourceNotFound(WELCOME, "text/html")
try:
r = XCAPUri(xcap_root, resource_selector, namespaces)
except NodeParsingError as e:
raise HTTPException(status_code=e.status_code, detail=e.args)
except DocumentSelectorError as e:
raise HTTPException(status_code=e.status_code, detail=e.args)
if r.user.domain is None:
r.user.domain = default_realm
return r
class Credentials(object):
def __init__(self, username, password=None, realm=None):
self.username = username
self.password = password
self.realm = realm
@property
def hash(self):
return hashlib.md5('{0.username}:{0.realm}:{0.password}'.format(self).encode()).hexdigest()
def checkPassword(self, password: bytes) -> bool:
return self.password == password
def checkHash(self, digestHash):
return digestHash == self.hash
def is_valid(self, user):
if AuthenticationConfig.cleartext_passwords:
return self.checkPassword(user.password)
else:
return self.checkHash(user.ha1)
class AuthenticationManager:
def __init__(self):
self.nonce_cache = nonce_cache
self.trusted_peers = AuthenticationConfig.trusted_peers
# Helper function to generate a nonce
def generate_nonce(self) -> str:
"""Generate a new nonce (typically a random string with a timestamp)."""
timestamp = int(time.time())
unique_nonce = f"{timestamp}-{uuid4()}"
return base64.b64encode(unique_nonce.encode()).decode("utf-8")
# Helper function to create the Digest response hash
async def create_digest_response(self, username: str, nonce: str, uri: str, method: str, realm: str, cnonce: str, nc: str, qop: str) -> str:
credentials = Credentials(username.split('@', 1)[0], realm=realm)
user = await Backend.backend.PasswordChecker().query_user(credentials)
if user is None:
raise HTTPException(status_code=401, detail="Invalid credentials")
ha1 = user[0].ha1
if AuthenticationConfig.cleartext_passwords:
ha1 = credentials.hash
# Compute the ha2 hash (method:uri)
ha2 = hashlib.md5(f"{method}:{uri}".encode()).hexdigest()
# Compute the final response using the formula: MD5(HA1:nonce:nc:cnonce:qop:HA2)
response = hashlib.md5(f"{ha1}:{nonce}:{nc}:{cnonce}:{qop}:{ha2}".encode()).hexdigest()
return response
# Function to validate and clean up expired nonces
def validate_nonce(self, nonce: str) -> bool:
"""Check if the nonce is valid and not expired."""
if nonce not in self.nonce_cache:
return False
timestamp, _ = self.nonce_cache[nonce]
current_time = int(time.time())
if current_time - timestamp > NONCE_EXPIRATION_TIME:
# Nonce expired, remove it from the cache
del self.nonce_cache[nonce]
return False
return True
# Digest Authentication Dependency
async def digest_auth(self, request: Request, realm: str) -> None:
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Digest "):
nonce = self.generate_nonce()
opaque = "eee38d7sacbefv2a3450ciny7QMkPqMAFRtzCUYo5tdS"
self.nonce_cache[nonce] = (int(time.time()), opaque) # Store nonce with timestamp
www_authenticate_header = (
f'Digest realm="{realm}", nonce="{nonce}", opaque={opaque}, algorithm=MD5, qop=auth'
)
raise HTTPException(
status_code=401,
detail="Digest authentication required",
headers={"WWW-Authenticate": www_authenticate_header},
)
# Parse the Digest fields from the header
try:
auth_fields = {k: v.strip('"') for k, v in (field.split("=", 1) for field in auth_header[7:].split(", "))}
except ValueError:
raise HTTPException(status_code=401, detail="Invalid Digest authentication format")
required_fields = ["username", "nonce", "response", "uri", "qop", "cnonce", "nc"]
for field in required_fields:
if field not in auth_fields:
raise HTTPException(status_code=401, detail=f"Missing required Digest field: {field}")
username = auth_fields["username"]
nonce = auth_fields["nonce"]
response = auth_fields["response"]
uri = auth_fields['uri']
method = request.method
cnonce = auth_fields['cnonce']
nc = auth_fields['nc']
qop = auth_fields['qop']
# Use the database session to check for user
if not self.validate_nonce(nonce):
raise HTTPException(status_code=401, detail="Invalid or expired nonce")
# Validate the Digest response
expected_response = await self.create_digest_response(username, nonce, uri, method, realm, cnonce, nc, qop)
# expected_response = self.create_digest_response(username, nonce, uri, method, db, realm)
if response != expected_response:
raise HTTPException(status_code=401, detail="Invalid credentials or response")
async def basic_auth(self, request: Request, realm: str) -> None:
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Basic "):
raise HTTPException(
status_code=401,
detail="Basic authentication required",
headers={"WWW-Authenticate": f"Basic realm=\"{AuthenticationConfig.default_realm}\""}
)
auth_value = auth_header[6:].strip()
decoded_value = base64.b64decode(auth_value).decode("utf-8")
username, password = decoded_value.split(":")
credentials = Credentials(username, password, realm)
user = await Backend.backend.PasswordChecker().query_user(credentials)
if user is None or not credentials.is_valid(user[0]):
raise HTTPException(
status_code=401,
detail="Invalid credentials"
)
# Function to check if the client IP is in the trusted peers list
def is_ip_trusted(self, client_ip: Optional[str]) -> bool:
"""Check if the client IP is in the trusted peers list."""
if not self.trusted_peers or client_ip is None:
return False
# Iterate through each network range in the trusted_parties list
for range in self.trusted_peers:
# Convert the IP address to a 32-bit integer
ip_int = struct.unpack('!L', socket.inet_aton(client_ip))[0]
# Perform the bitwise comparison (IP address & network mask == base address)
if ip_int & range[1] == range[0]:
return True
# If the IP address does not match any range, return False
return False
async def authenticate_request(self, request: Request) -> XCAPUri:
"""Authenticate a request by checking IP and applying Digest or Basic authentication as needed."""
client_ip = get_client_ip(request)
xcap_uri = parseNodeURI(str(request.url), AuthenticationConfig.default_realm)
if xcap_uri.doc_selector.context == 'global':
return xcap_uri
realm = xcap_uri.user.domain
if realm is None:
raise ResourceNotFound('Unknown domain (the domain part of "username@domain" is required because this server has no default domain)')
if request.method == "GET" and xcap_uri.application_id in public_get_applications:
return xcap_uri
if self.is_ip_trusted(client_ip):
return xcap_uri
if AuthenticationConfig.type == 'digest':
await self.digest_auth(request, realm)
elif AuthenticationConfig.type == 'basic':
await self.basic_auth(request, realm)
else:
raise ValueError('Invalid authentication type: %r. Please check the configuration.' % AuthenticationConfig.type)
return xcap_uri
diff --git a/xcap/uri.py b/xcap/uri.py
index a1e2586..29914aa 100644
--- a/xcap/uri.py
+++ b/xcap/uri.py
@@ -1,113 +1,113 @@
"""XCAP URI module
http://tools.ietf.org/html/rfc4825#section-6
The algorithm to decode the URI is as following:
* First, percent-decode the whole URI (urllib.unquote)
* Split document selector from node selector (str.split('~~'))
* Then use xpath_tokenizer from lxml to parse the whole node selector
and extract individual steps
Although after doing percent-decoding first, we cannot use s.split('/'),
using lexer from lxml alleviates that fact a bit and produces good results.
A potential problem can arise with URIs that contain [percent-encoded] double quotes.
Here's an example:
/resource-lists/list[@name="friends"]/external[@anchor="/list[@name=%22mkting%22]"]
which would be converted to
/resource-lists/list[@name="friends"]/external[@anchor="/list[@name="mkting"]"]
and that would confuse the parser.
I'm not sure if it's legal to have such URIs, but if it is this module has to be fixed.
Meanwhile, the safe approach is to use "
/resource-lists/list[@name="friends"]/external[@anchor="/list[@name="mkting"]"]
"""
-from typing import Any, Optional, Union
+from typing import Any, Dict, Optional, Union
from urllib.parse import unquote
from xcap.configuration.datatypes import XCAPRootURI
from xcap.xpath import DocumentSelector, NodeSelector
class XCAPUser(object):
def __init__(self, username: Optional[str] = None, domain: Optional[str] = None):
self.username = username
self.domain = domain
@property
def uri(self) -> str:
return 'sip:%s@%s' % (self.username, self.domain)
def __eq__(self, other) -> bool:
return isinstance(other, XCAPUser) and self.uri == other.uri
def __ne__(self, other) -> bool:
return not self.__eq__(other)
def __bool__(self) -> bool:
return bool(self.username) and bool(self.domain)
def __str__(self) -> str:
return "%s@%s" % (self.username, self.domain)
def __repr__(self) -> str:
return 'XCAPUser(%r, %r)' % (self.username, self.domain)
def __hash__(self) -> int:
return hash(self.username)
@classmethod
def parse(cls, user_id: str, default_domain: Optional[str] = None) -> "XCAPUser":
if user_id.startswith("sip:"):
user_id = user_id[4:]
_split = user_id.split('@', 1)
username = _split[0]
if len(_split) == 2:
domain = _split[1]
else:
domain = default_domain if default_domain else ''
return cls(username, domain)
class XCAPUri(object):
"""An XCAP URI containing the XCAP root, document selector and node selector."""
- def __init__(self, xcap_root: XCAPRootURI, resource_selector: str, namespaces: dict[Any, Any]):
+ def __init__(self, xcap_root: XCAPRootURI, resource_selector: str, namespaces: Dict[Any, Any]):
"namespaces maps application id to default namespace"
self.xcap_root = xcap_root
self.resource_selector = unquote(resource_selector)
realm = None
# convention to get the realm if it's not contained in the user ID section
# of the document selector (bad eyebeam)
if self.resource_selector.startswith("@"):
first_slash = self.resource_selector.find("/")
realm = self.resource_selector[1:first_slash]
self.resource_selector = self.resource_selector[first_slash:]
_split = self.resource_selector.split('~~', 1)
doc_selector = _split[0]
self.doc_selector = DocumentSelector(doc_selector)
self.application_id = self.doc_selector.application_id
self.node_selector: Union[NodeSelector, None] = None
if len(_split) == 2:
self.node_selector = NodeSelector(_split[1], namespaces.get(self.application_id))
if self.doc_selector.user_id:
self.user = XCAPUser.parse(self.doc_selector.user_id, realm)
else:
self.user = XCAPUser(None, realm)
def __str__(self) -> str:
return self.xcap_root + self.resource_selector