diff --git a/xcap/db/__init__.py b/xcap/db/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/xcap/db/initialize.py b/xcap/db/initialize.py new file mode 100644 index 0000000..5034853 --- /dev/null +++ b/xcap/db/initialize.py @@ -0,0 +1,93 @@ +import re + +from alembic import command +from alembic.config import Config +from alembic.util.exc import CommandError +from application import log +from sqlalchemy import create_engine + +from xcap.configuration import DatabaseConfig as XCAPDatabaseConfig +from xcap.configuration import ServerConfig + + +class DatabaseConfig(XCAPDatabaseConfig): + __cfgfile__ = 'config.ini' + __section__ = 'Database' + + authentication_db_uri = '' + storage_db_uri = '' + + +same_db = False +alembic_cfg = Config("alembic.ini", 'storage_db') # Alembic configuration +alembic_auth_cfg = Config("alembic.ini", 'auth_db') + + +def retry_on_error(engine, cfg, e): + error_message = str(e) + log.info(f"Alembic CommandError: {error_message}") + locations = cfg.get_main_option("version_locations") + set_all_version_locations(cfg) + + match = re.search(r"Can't locate revision identified by '([\da-f]+)'", error_message) + if match: + missing_revision = match.group(1) + log.info(f"Detected missing revision: {missing_revision}") + try: + command.downgrade(cfg, f'{missing_revision}@base') # Apply Alembic migrations + log.info("Downgrade success") + except CommandError as retry_error: + log.warning(f"Downgrade failed: {retry_error}") + return + + cfg.set_main_option("version_locations", locations) + try: + command.upgrade(cfg, "head") # Apply Alembic migrations + except CommandError as e: + log.waring(f"Migration failed: {e}") + return + + +def set_all_version_locations(cfg): + storage_locations = alembic_cfg.get_main_option("version_locations") + auth_locations = alembic_auth_cfg.get_main_option("version_locations") + cfg.set_main_option("version_locations", f"{storage_locations}, {auth_locations}") + + +def init_db(): + if ServerConfig.backend not in ['Database', 'OpenSIPS'] and not DatabaseConfig.storage_db_uri or not DatabaseConfig.authentication_db_uri: + return + + if DatabaseConfig.authentication_db_uri == DatabaseConfig.storage_db_uri: + same_db = True + set_all_version_locations(alembic_auth_cfg) + + if DatabaseConfig.authentication_db_uri.startswith('sqlite'): + auth_engine = create_engine(DatabaseConfig.authentication_db_uri, connect_args={"check_same_thread": False}) + alembic_auth_cfg.set_main_option("sqlalchemy.url", DatabaseConfig.authentication_db_uri) # Path to migrations + with auth_engine.connect(): + if same_db: + command.upgrade(alembic_auth_cfg, "heads") # Apply Alembic migrations + log.info("Database initialized and migrations applied.") + else: + try: + command.upgrade(alembic_auth_cfg, "head") # Apply Alembic migrations + except CommandError as e: + retry_on_error(auth_engine, alembic_auth_cfg, e) + log.info("Authentication database initialized and migrations applied.") + + if not same_db and DatabaseConfig.storage_db_uri.startswith('sqlite'): + engine = create_engine(DatabaseConfig.storage_db_uri, connect_args={"check_same_thread": False}) + alembic_cfg.set_main_option("sqlalchemy.url", DatabaseConfig.storage_db_uri) # Path to migrations + with engine.connect(): + try: + command.upgrade(alembic_cfg, "head") # Apply Alembic migrations + except CommandError as e: + retry_on_error(engine, alembic_cfg, e) + + log.info("Storage database initialized and migrations applied.") + +# Main function to initialize and create tables +if __name__ == "__main__": + init_db() + diff --git a/xcap/db/manager.py b/xcap/db/manager.py new file mode 100644 index 0000000..b55ac1c --- /dev/null +++ b/xcap/db/manager.py @@ -0,0 +1,108 @@ +import sys +from contextlib import asynccontextmanager +from typing import AsyncIterator + +from application import log +from application.notification import IObserver, NotificationCenter +from sqlalchemy import event +from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine +from sqlalchemy.orm import sessionmaker +from sqlmodel import SQLModel +from zope.interface import implementer + +from xcap.configuration import DatabaseConfig, ServerConfig +from xcap.errors import DBError, NoDatabase + + +@implementer(IObserver) +class DatabaseConnectionManager: + AsyncSessionLocal = None + AsyncAuthSessionLocal = None + dburi = None + + def __init__(self): + NotificationCenter().add_observer(self) + + def handle_notification(self, notification): + if notification.name == 'db_uri': + self.configure_db_connection(notification.data) + + def create_engine(self, uri): + if uri.startswith('sqlite'): + return create_async_engine(uri, connect_args={"check_same_thread": False}, echo=False) + elif uri.startswith('mysql'): + return create_async_engine(uri, echo=False) + else: + raise ValueError("Unsupported database URI scheme") + + def configure_db_connection(self, uri=None): + """ Configure the database connection with the provided URI for Uvicorn """ + if uri and self.dburi == uri: + return + + if uri and ServerConfig.backend == 'Sipthor': + storage_db_uri = authentication_db_uri = uri + elif not uri: + storage_db_uri = DatabaseConfig.storage_db_uri + authentication_db_uri = DatabaseConfig.authentication_db_uri + + engine = self.create_engine(storage_db_uri) + auth_engine = self.create_engine(authentication_db_uri) + + self.dburi = uri + self.AsyncSessionLocal = sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False) + self.AsyncAuthSessionLocal = sessionmaker(bind=auth_engine, class_=AsyncSession, expire_on_commit=False) + + +@asynccontextmanager +async def get_db_session() -> AsyncIterator[AsyncSession]: + if not connection_manager.AsyncSessionLocal: + raise NoDatabase + async with connection_manager.AsyncSessionLocal() as session: + + @event.listens_for(session.get_bind(), 'handle_error') + async def handle_error(exc): + original_exception = exc.original_exception + + exception_type = type(original_exception).__name__ + error_code = getattr(original_exception, 'args', [None, None])[0] # Error code if available + error_message = getattr(original_exception, 'args', [None, None])[1] # Error message if available + + log.error(f"{exception_type}: {error_code}, \"{error_message}\"") + raise DBError + + yield session + + +@asynccontextmanager +async def get_auth_db_session() -> AsyncIterator[AsyncSession]: + if not connection_manager.AsyncAuthSessionLocal: + raise NoDatabase + + async with connection_manager.AsyncAuthSessionLocal() as session: + @event.listens_for(session.get_bind(), 'handle_error') + async def handle_error(exc): + original_exception = exc.original_exception + + exception_type = type(original_exception).__name__ + error_code = getattr(original_exception, 'args', [None, None])[0] # Error code if available + error_message = getattr(original_exception, 'args', [None, None])[1] # Error message if available + + log.error(f"{exception_type}: {error_code}, \"{error_message}\"") + raise DBError + + yield session + +connection_manager = DatabaseConnectionManager() + +Base = SQLModel + +# logger = log.get_logger('sqlalchemy.engine') +# logger.setLevel(log.level.DEBUG) + +if ServerConfig.backend == 'OpenSIPS' or ServerConfig.backend == 'Database': + if not DatabaseConfig.authentication_db_uri or not DatabaseConfig.storage_db_uri: + log.critical('Authentication DB URI and Storage DB URI must be provided') + sys.exit(1) + connection_manager.configure_db_connection() + diff --git a/xcap/db/models/__init__.py b/xcap/db/models/__init__.py new file mode 100644 index 0000000..23aba05 --- /dev/null +++ b/xcap/db/models/__init__.py @@ -0,0 +1,2 @@ +from .auth_db import Subscriber +from .storage_db import XCAP, Watcher diff --git a/xcap/db/models/auth_db.py b/xcap/db/models/auth_db.py new file mode 100644 index 0000000..9f10d38 --- /dev/null +++ b/xcap/db/models/auth_db.py @@ -0,0 +1,18 @@ +from typing import Optional + +from sqlalchemy import Column, String +from sqlmodel import Field, SQLModel + +from xcap.configuration import DatabaseConfig + + +class Subscriber(SQLModel, table=True): + __tablename__ = DatabaseConfig.subscriber_table + __database__ = 'auth_db' + + id: Optional[int] = Field(default=None, primary_key=True) + username: str = Field(default=None, max_length=64, sa_column=Column(DatabaseConfig.user_col, String(64), nullable=True)) + domain: str = Field(default=None, max_length=64, sa_column=Column(DatabaseConfig.domain_col, String(64), nullable=True)) + password: str = Field(default=None, max_length=255, sa_column=Column(DatabaseConfig.password_col, String(255), nullable=True)) + ha1: str = Field(default=None, max_length=64, sa_column=Column(DatabaseConfig.ha1_col, String(64), nullable=True)) + diff --git a/xcap/db/models/storage_db.py b/xcap/db/models/storage_db.py new file mode 100644 index 0000000..e6f5ab7 --- /dev/null +++ b/xcap/db/models/storage_db.py @@ -0,0 +1,49 @@ +from datetime import datetime +from typing import Optional + +from sqlmodel import (Field, ForeignKey, Index, Relationship, SQLModel, + UniqueConstraint) + +from xcap.configuration import DatabaseConfig + + +class XCAP(SQLModel, table=True): + __tablename__ = DatabaseConfig.xcap_table + __database__ = 'storage_db' + id: Optional[int] = Field(default=None, primary_key=True) + subscriber_id: Optional[int] = Field(default=None, sa_column=ForeignKey("subscriber.id", ondelete="CASCADE")) + username: str = Field(max_length=64) + domain: str = Field(max_length=64) + doc: bytes # Representing longblob as bytes + doc_type: int + etag: str = Field(max_length=64) + source: int = Field(default=0) + doc_uri: str = Field(max_length=255) + port: int = Field(default=0) + + __table_args__ = ( + UniqueConstraint("username", "domain", "doc_type", "doc_uri", name="account_doc_type_idx"), + Index("xcap_subscriber_id_exists", "subscriber_id"), + Index("source_idx", "source"), + ) + +# subscriber: Optional["Subscriber"] = Relationship(back_populates="none", cascade="all, delete-orphan") + + +class Watcher(SQLModel, table=True): + __tablename__ = 'watchers' + __database__ = 'storage_db' + + id: int = Field(default=None, primary_key=True) + presentity_uri: str = Field(max_length=255) + watcher_username: str = Field(max_length=64) + watcher_domain: str = Field(max_length=64) + event: str = Field(default="presence", max_length=64) + status: int + reason: Optional[str] = Field(default=None, max_length=64) + inserted_time: int + + # Unique constraint for multiple columns + __table_args__ = ( + UniqueConstraint('presentity_uri', 'watcher_username', 'watcher_domain', 'event', name='watcher_idx'), + )