Asyncio

Support for asyncio is largely built on top of the Alembic Cookbook example, inlined here for posterity:

import asyncio

# ... no change required to the rest of the code

def do_run_migrations(connection):
    context.configure(connection=connection, target_metadata=target_metadata)

    with context.begin_transaction():
        context.run_migrations()


async def run_migrations_online():
    """Run migrations in 'online' mode.

    In this scenario we need to create an Engine
    and associate a connection with the context.

    """
    connectable = AsyncEngine(
        engine_from_config(
            config.get_section(config.config_ini_section),
            prefix="sqlalchemy.",
            poolclass=pool.NullPool,
            future=True,
        )
    )

    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)

    await connectable.dispose()


if context.is_offline_mode():
    run_migrations_offline()
else:
    asyncio.run(run_migrations_online())

Note that this is a prerequisite for how one gets alembic itself to run with an async connection, when running alembic commands interatively yourself.

At this point, you just need to make sure the alembic_engine fixture is producing a async engine. something like

from sqlalchemy import create_engine
from sqlalchemy.ext.asyncio import create_engine_async, AsyncEngine

@pytest.fixture
def alembic_engine(...):
    return create_async_engine(URL(...))


@pytest.fixture
def alembic_engine(...):
    engine = create_engine(URL(...))
    return AsyncEngine(engine)


# or, for example, with pytest-mock-resources
from pytest_mock_resources import create_postgres_fixture

alembic_engine = create_postgres_fixture(async_=True)

A slightly more versatile setup

The above env.py setup comes with a caveat. It assumes execution of the migrations solely through async. Due to the way sqlalchemy/alembic async works (as evidenced by even their suggested use of run_sync), this can be a problem.

For pytest-alembic the only such built in test is test_downgrade_leaves_no_trace. For compatibility with (majority) sync alembic use, it’s implemented sychronously, and internally requires performing transaction manipulation which would otherwise require re-entrant use of asyncio.run.

If you don’t use this test, and haven’t implemented any of your own which encounter this issue, then feel free to stick with the official alembic suggestion. However a slight reorganization of their suggested setup allows for both sychronous and asynchronous execution of migrations, and thus fixes test_downgrade_leaves_no_trace.

from sqlalchemy.ext.asyncio.engine import AsyncEngine

def run_migrations_online():
    connectable = context.config.attributes.get("connection", None)

    if connectable is None:
        connectable = AsyncEngine(
            engine_from_config(
                context.config.get_section(context.config.config_ini_section),
                prefix="sqlalchemy.",
                poolclass=pool.NullPool,
                future=True,
            )
        )

    # Note, we decide whether to run asynchronously based on the kind of engine we're dealing with.
    if isinstance(connectable, AsyncEngine):
        asyncio.run(run_async_migrations(connectable))
    else:
        do_run_migrations(connectable)


# Then use their setup for async connection/running of the migration
async def run_async_migrations(connectable):
    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)

    await connectable.dispose()


def do_run_migrations(connection):
    context.configure(connection=connection, target_metadata=target_metadata)

    with context.begin_transaction():
        context.run_migrations()


# But the outer layer still allows sychronous execution also.
run_migrations_online()