diff --git a/xcap/authentication/__init__.py b/xcap/authentication/__init__.py
new file mode 100644
index 0000000..add8b07
--- /dev/null
+++ b/xcap/authentication/__init__.py
@@ -0,0 +1 @@
+from .auth import AuthenticationManager
diff --git a/xcap/authentication/auth.py b/xcap/authentication/auth.py
new file mode 100644
index 0000000..d56fe22
--- /dev/null
+++ b/xcap/authentication/auth.py
@@ -0,0 +1,240 @@
+import base64
+import hashlib
+import socket
+import struct
+import time
+from typing import Optional
+from uuid import uuid4
+
+from fastapi import HTTPException, Request
+
+from xcap import __version__
+from xcap.appusage import ServerConfig as Backend
+from xcap.appusage import namespaces, public_get_applications
+from xcap.configuration import AuthenticationConfig, ServerConfig
+from xcap.errors import ResourceNotFound
+from xcap.http_utils import get_client_ip
+from xcap.uri import XCAPUri
+from xcap.xpath import DocumentSelectorError, NodeParsingError
+
+# In-memory nonce cache with expiration time (for demonstration purposes)
+nonce_cache: dict[int, str] = {}
+NONCE_EXPIRATION_TIME = 900 # 15 minutes for nonce expiration
+
+WELCOME = ('
Not Found'
+ 'Not Found
XCAP server does not serve anything '
+ 'directly under XCAP Root URL. You have to be more specific.'
+ '
'
+ 'OpenXCAP/%s'
+ '') % __version__
+
+
+def parseNodeURI(node_uri: str, default_realm: str) -> XCAPUri:
+ """Parses the given Node URI, containing the XCAP root, document selector,
+ and node selector, and returns an XCAPUri instance if succesful."""
+ xcap_root = None
+ for uri in ServerConfig.root.uris:
+ if node_uri.startswith(uri):
+ xcap_root = uri
+ break
+ if xcap_root is None:
+ raise ResourceNotFound("XCAP root not found for URI: %s" % node_uri)
+ resource_selector = node_uri[len(xcap_root):]
+ if not resource_selector or resource_selector == '/':
+ raise ResourceNotFound(WELCOME, "text/html")
+
+ try:
+ r = XCAPUri(xcap_root, resource_selector, namespaces)
+ except NodeParsingError as e:
+ raise HTTPException(status_code=e.status_code, detail=e.args)
+ except DocumentSelectorError as e:
+ raise HTTPException(status_code=e.status_code, detail=e.args)
+ if r.user.domain is None:
+ r.user.domain = default_realm
+ return r
+
+
+class Credentials(object):
+ def __init__(self, username, password=None, realm=None):
+ self.username = username
+ self.password = password
+ self.realm = realm
+
+ @property
+ def hash(self):
+ return hashlib.md5('{0.username}:{0.realm}:{0.password}'.format(self).encode()).hexdigest()
+
+ def checkPassword(self, password: bytes) -> bool:
+ return self.password == password
+
+ def checkHash(self, digestHash):
+ return digestHash == self.hash
+
+ def is_valid(self, user):
+ if AuthenticationConfig.cleartext_passwords:
+ return self.checkPassword(user.password)
+ else:
+ return self.checkHash(user.ha1)
+
+
+class AuthenticationManager:
+ def __init__(self):
+ self.nonce_cache = nonce_cache
+ self.trusted_peers = AuthenticationConfig.trusted_peers
+
+ # Helper function to generate a nonce
+ def generate_nonce(self) -> str:
+ """Generate a new nonce (typically a random string with a timestamp)."""
+ timestamp = int(time.time())
+ unique_nonce = f"{timestamp}-{uuid4()}"
+ return base64.b64encode(unique_nonce.encode()).decode("utf-8")
+
+ # Helper function to create the Digest response hash
+ async def create_digest_response(self, username: str, nonce: str, uri: str, method: str, realm: str, cnonce: str, nc: str, qop: str) -> str:
+ credentials = Credentials(username.split('@', 1)[0], realm=realm)
+ user = await Backend.backend.PasswordChecker().query_user(credentials)
+
+ if user is None:
+ raise HTTPException(status_code=401, detail="Invalid credentials")
+
+ ha1 = user[0].ha1
+ if AuthenticationConfig.cleartext_passwords:
+ ha1 = credentials.hash
+
+ # Compute the ha2 hash (method:uri)
+ ha2 = hashlib.md5(f"{method}:{uri}".encode()).hexdigest()
+
+ # Compute the final response using the formula: MD5(HA1:nonce:nc:cnonce:qop:HA2)
+ response = hashlib.md5(f"{ha1}:{nonce}:{nc}:{cnonce}:{qop}:{ha2}".encode()).hexdigest()
+
+ return response
+
+ # Function to validate and clean up expired nonces
+ def validate_nonce(self, nonce: str) -> bool:
+ """Check if the nonce is valid and not expired."""
+ if nonce not in self.nonce_cache:
+ return False
+ timestamp, _ = self.nonce_cache[nonce]
+ current_time = int(time.time())
+ if current_time - timestamp > NONCE_EXPIRATION_TIME:
+ # Nonce expired, remove it from the cache
+ del self.nonce_cache[nonce]
+ return False
+ return True
+
+ # Digest Authentication Dependency
+ async def digest_auth(self, request: Request, realm: str) -> None:
+ auth_header = request.headers.get("Authorization")
+
+ if not auth_header or not auth_header.startswith("Digest "):
+ nonce = self.generate_nonce()
+ opaque = "eee38d7sacbefv2a3450ciny7QMkPqMAFRtzCUYo5tdS"
+
+ self.nonce_cache[nonce] = (int(time.time()), opaque) # Store nonce with timestamp
+ www_authenticate_header = (
+ f'Digest realm="{realm}", nonce="{nonce}", opaque={opaque}, algorithm=MD5, qop=auth'
+ )
+ raise HTTPException(
+ status_code=401,
+ detail="Digest authentication required",
+ headers={"WWW-Authenticate": www_authenticate_header},
+ )
+
+ # Parse the Digest fields from the header
+ try:
+ auth_fields = {k: v.strip('"') for k, v in (field.split("=", 1) for field in auth_header[7:].split(", "))}
+ except ValueError:
+ raise HTTPException(status_code=401, detail="Invalid Digest authentication format")
+
+ required_fields = ["username", "nonce", "response", "uri", "qop", "cnonce", "nc"]
+ for field in required_fields:
+ if field not in auth_fields:
+ raise HTTPException(status_code=401, detail=f"Missing required Digest field: {field}")
+
+ username = auth_fields["username"]
+ nonce = auth_fields["nonce"]
+ response = auth_fields["response"]
+ uri = auth_fields['uri']
+ method = request.method
+ cnonce = auth_fields['cnonce']
+ nc = auth_fields['nc']
+ qop = auth_fields['qop']
+
+ # Use the database session to check for user
+ if not self.validate_nonce(nonce):
+ raise HTTPException(status_code=401, detail="Invalid or expired nonce")
+
+ # Validate the Digest response
+ expected_response = await self.create_digest_response(username, nonce, uri, method, realm, cnonce, nc, qop)
+ # expected_response = self.create_digest_response(username, nonce, uri, method, db, realm)
+
+ if response != expected_response:
+ raise HTTPException(status_code=401, detail="Invalid credentials or response")
+
+ async def basic_auth(self, request: Request, realm: str) -> None:
+ auth_header = request.headers.get("Authorization")
+ if not auth_header or not auth_header.startswith("Basic "):
+ raise HTTPException(
+ status_code=401,
+ detail="Basic authentication required",
+ headers={"WWW-Authenticate": f"Basic realm=\"{AuthenticationConfig.default_realm}\""}
+ )
+
+ auth_value = auth_header[6:].strip()
+ decoded_value = base64.b64decode(auth_value).decode("utf-8")
+ username, password = decoded_value.split(":")
+ credentials = Credentials(username, password, realm)
+
+ user = await Backend.backend.PasswordChecker().query_user(credentials)
+
+ if user is None or not credentials.is_valid(user[0]):
+ raise HTTPException(
+ status_code=401,
+ detail="Invalid credentials"
+ )
+
+ # Function to check if the client IP is in the trusted peers list
+ def is_ip_trusted(self, client_ip: Optional[str]) -> bool:
+ """Check if the client IP is in the trusted peers list."""
+ if not self.trusted_peers or client_ip is None:
+ return False
+
+ # Iterate through each network range in the trusted_parties list
+ for range in self.trusted_peers:
+ # Convert the IP address to a 32-bit integer
+ ip_int = struct.unpack('!L', socket.inet_aton(client_ip))[0]
+
+ # Perform the bitwise comparison (IP address & network mask == base address)
+ if ip_int & range[1] == range[0]:
+ return True
+
+ # If the IP address does not match any range, return False
+ return False
+
+ async def authenticate_request(self, request: Request) -> XCAPUri:
+ """Authenticate a request by checking IP and applying Digest or Basic authentication as needed."""
+ client_ip = get_client_ip(request)
+ xcap_uri = parseNodeURI(str(request.url), AuthenticationConfig.default_realm)
+
+ if xcap_uri.doc_selector.context == 'global':
+ return xcap_uri
+
+ realm = xcap_uri.user.domain
+
+ if realm is None:
+ raise ResourceNotFound('Unknown domain (the domain part of "username@domain" is required because this server has no default domain)')
+
+ if request.method == "GET" and xcap_uri.application_id in public_get_applications:
+ return xcap_uri
+
+ if self.is_ip_trusted(client_ip):
+ return xcap_uri
+
+ if AuthenticationConfig.type == 'digest':
+ await self.digest_auth(request, realm)
+ elif AuthenticationConfig.type == 'basic':
+ await self.basic_auth(request, realm)
+ else:
+ raise ValueError('Invalid authentication type: %r. Please check the configuration.' % AuthenticationConfig.type)
+
+ return xcap_uri