61.8 Async SQLAlchemy with asyncpg
Alright, let’s talk about making SQLAlchemy sing asynchronously. You’re here because you’ve felt the pain of your beautifully crafted FastAPI (or whatever async framework you’re using) grinding to a halt every time it has to go talk to the database, waiting patiently while your precious event loop blocks. It’s 2023, we don’t wait for anything anymore, not even our databases.
So we reach for asyncpg and SQLAlchemy’s async support. This isn’t your grandfather’s ORM. It’s a different beast, and you have to treat it with respect, or it will bite you. The core idea is simple: instead of executing queries and blocking the thread until the database responds, we await the results, freeing the event loop to go handle other requests while the database does its thing.
The Core Setup: It’s All About the Engine
First, forget create_engine. That’s for the blocking, old-world charm. We use create_async_engine. This function doesn’t just take a connection string; it takes an async connection string. Notice the +asyncpg driver. This is non-negotiable. The engine it creates manages a pool of async connections under the hood.
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
# Note the '+asyncpg' driver. This is what tells SQLAlchemy to use asyncpg.
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost:5432/your_db"
# echo=True is great for development, awful for production.
# The pool size here is a suggestion; tune it for your actual database.
async_engine = create_async_engine(
DATABASE_URL,
echo=True,
pool_size=20,
max_overflow=10
)
# This is our factory for creating AsyncSession objects.
# We bind it to our async_engine and set other sensible defaults.
AsyncSessionLocal = sessionmaker(
bind=async_engine,
class_=AsyncSession,
expire_on_commit=False, # Usually what you want in async to avoid lazy load issues after commit
autoflush=False
)
Your First Async Session: A Word of Caution
Now, how do you use this AsyncSession? The most important rule: You cannot use the session outside of an async context. This seems obvious, but it’s the number one foot-gun. Every interaction is a potential await point.
Here’s the right way to do it, using an async context manager. This is your best friend. It handles the rollback on exception and close for you, which is critical because forgetting to close an async session will leave connections dangling in your pool.
from my_database_setup_file import AsyncSessionLocal
from my_models import User
async def create_user(username: str):
# This 'async with' block is the key.
async with AsyncSessionLocal() as session:
# See? We have to 'await' the session.execute()!
result = await session.execute(
select(User).where(User.username == username)
)
existing_user = result.scalar_one_or_none()
if existing_user:
return f"User {username} already exists."
new_user = User(username=username)
session.add(new_user)
# And we have to 'await' the commit too! Nothing is implicit.
await session.commit()
# After commit, new_user is expired by default unless you disable it.
return f"Created user: {new_user.username}"
Why expire_on_commit=False is Your Friend
I hinted at this in the setup. In standard SQLAlchemy, after a session.commit(), all your object’s attributes are expired. The next time you access one, it’ll automatically lazy-load the data from the database. In a blocking world, that’s just a minor performance hiccup. In an async world, that’s a deadly trap.
Imagine you commit your user, then later in the same function try to print user.email. Surprise! That’s an attribute access, which triggers a lazy load. But you can’t do I/O without an await. This will throw a brutal RuntimeWarning: coroutine was never awaited and probably break everything. Setting expire_on_commit=False stops this madness. The object’s state remains after the commit. Just be aware you might be working with stale data if you hold onto the object for a long time.
The Pitfalls: Lazy Loading is Now a War Crime
This brings us to the single biggest “gotcha” in async SQLAlchemy: Do not use lazy loading. Just don’t. It was always a questionable pattern, but in async, it’s an anti-pattern. You will forget to await it, and your code will fail in confusing ways.
You must be explicit. Use selectinload or joinedload to eagerly load the relationships you know you’ll need in the same query.
from sqlalchemy import select
from sqlalchemy.orm import selectinload
async def get_user_with_posts(user_id: int):
async with AsyncSessionLocal() as session:
# Bad: will cause problems if you try to access '.posts' later
# result = await session.execute(select(User).where(User.id == user_id))
# Good: explicitly load the posts relationship upfront
query = select(User).where(User.id == user_id).options(selectinload(User.posts))
result = await session.execute(query)
user = result.scalar_one_or_none()
# Now this is safe, because the posts were loaded in the initial query.
if user:
for post in user.posts:
print(post.title)
return user
Transactions: Getting It Right
Managing transactions manually is similar but, you guessed it, requires await calls.
async def transfer_funds(from_id: int, to_id: int, amount: float):
async with AsyncSessionLocal() as session:
# Begin a transaction explicitly
async with session.begin():
# We're now inside a transaction block
get_from_stmt = select(Account).where(Account.id == from_id).with_for_update()
from_acc = (await session.execute(get_from_stmt)).scalar_one()
if from_acc.balance < amount:
raise ValueError("Insufficient funds")
get_to_stmt = select(Account).where(Account.id == to_id).with_for_update()
to_acc = (await session.execute(get_to_stmt)).scalar_one()
from_acc.balance -= amount
to_acc.balance += amount
# The 'async with session.begin():' block automatically commits on exit
# or rolls back on exception. No explicit 'await session.commit()' needed.
The key takeaway? Async SQLAlchemy gives you fantastic performance benefits by working harmoniously with your async framework, but it demands more discipline and explicitness from you. Stop being lazy (literally, regarding loading) and start being deliberate about your queries and sessions. It’s a trade-off: a bit more verbosity for a lot more scalability. And honestly, that’s a trade I’ll make any day.