import sqlalchemy.orm import sqlalchemy.ext.asyncio from .base import Base class Database: """ A class that is a composition of several other classes useful for accessing and manipulating a Postgresql database asynchronously. Usage: Database must first be initialized with a given postgresql connection url like so: >>> db = Database("postgresql://localhost:5432/postgres") After Database has been initialized two important objects that are composed into Database become available: -> async_engine - Relevant for handling the creation and deletion of many tables at once, for example: >>> async with self.async_engine.begin() as conn: >>> await conn.run_sync(self.Base.metadata.drop_all) >>> await conn.run_sync(self.Base.metadata.create_all) - See the sqlalchemy documentation for futher details at https://docs.sqlalchemy.org/en/14/orm/extensions/asyncio.html#sqlalchemy.ext.asyncio.AsyncEngine -> async_session - Relevant for properly manipulating rows and columns within the database, for example: >>> from sqlalchemy import orm as sqlalchemy_orm >>> from sqlalchemy import future as sqlalchemy_future >>> >>> async with db.async_session() as session: >>> async with session.begin(): >>> session.add_all( >>> [ >>> A(bs=[B(), B()], data="a1"), >>> A(bs=[B()], data="a2"), >>> A(bs=[B(), B()], data="a3"), >>> ] >>> ) >>> >>> stmt = sqlalchemy_future.select(A).options(sqlalchemy_orm.selectinload(A.bs)) >>> >>> result = await session.execute(stmt) >>> await session.commit() - See the sqlalchemy documentation for further details at https://docs.sqlalchemy.org/en/14/orm/extensions/asyncio.html#sqlalchemy.ext.asyncio.AsyncConnection """ Base = Base def __init__( self, db_url: str, async_session_expire_on_commit: bool = False, **engine_kwargs ): """ Constructor for the class Args: db_url: A string defining a standard postgresql url, for instance: postgresql://localhost:5432/postgres async_session_expire_on_commit: A boolean for determining if the given connection via a async context manager should close after the session.commit() function is called. **engine_kwargs: Arguments that can be passed to sqlalchemy's create_async_engine function """ self.connection_url = db_url self.async_engine = sqlalchemy.ext.asyncio.create_async_engine( self.connection_url, **engine_kwargs ) self.async_session = sqlalchemy.orm.sessionmaker( self.async_engine, expire_on_commit=async_session_expire_on_commit, class_=sqlalchemy.ext.asyncio.AsyncSession ) @property def connection_url(self) -> str: """ Getter for self.connection_url Returns: The postgresql connection string """ return self._connection_url @connection_url.setter def connection_url(self, url: str): """ Converts a given typical postgresql string to our asynchronous driver used with sqlalchemy Args: url: The given normal postgresql URL Returns: Nothing, setter for self.connection_url in the constructor """ self._connection_url = f"postgresql+asyncpg://{url.split('://')[-1]}" async def drop_all(self): """Drops all information from the connected database for the given Base class""" async with self.async_engine.begin() as conn: await conn.run_sync(self.Base.metadata.drop_all) async def create_all(self): """Creates all tables for the given Base class""" async with self.async_engine.begin() as conn: await conn.run_sync(self.Base.metadata.create_all)