15 lines
527 B
Python
15 lines
527 B
Python
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
from sqlalchemy.engine import URL
|
||
|
|
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession
|
||
|
|
from sqlalchemy.ext.asyncio import create_async_engine as _create_async_engine
|
||
|
|
from sqlalchemy.orm import sessionmaker
|
||
|
|
|
||
|
|
|
||
|
|
def create_async_engine(url: URL | str) -> AsyncEngine:
|
||
|
|
return _create_async_engine(url=url, pool_pre_ping=True, pool_recycle=3600)
|
||
|
|
|
||
|
|
|
||
|
|
def get_session_maker(engine: AsyncEngine) -> sessionmaker:
|
||
|
|
return sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
|