Skip to content

auto_rest.queries

The queries module provides asynchronous wrapper functions around operations involving SQLAlchemy sessions. These utilities automatically account for variations in behavior between synchronous and asynchronous session types (i.e., Session and AsyncSession instances). This ensures consistent query handling and provides a streamlined interface for database interactions.

Example: Query Execution

Query utilities seamlessly support synchronous and asynchronous session types.

query = select(SomeTable).where(SomeTable.id == item_id)

with Session(...) as sync_session:
    result = await execute_session_query(sync_session, query)

with AsyncSession(...) as async_session:
    result = await execute_session_query(async_session, query)

commit_session(session) async

Commit a SQLAlchemy session.

Supports synchronous and asynchronous sessions.

Parameters:

Name Type Description Default
session DBSession

The session to commit.

required
Source code in auto_rest/queries.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
async def commit_session(session: DBSession) -> None:
    """Commit a SQLAlchemy session.

    Supports synchronous and asynchronous sessions.

    Args:
        session: The session to commit.
    """

    if isinstance(session, AsyncSession):
        await session.commit()

    else:
        session.commit()

delete_session_record(session, record) async

Delete a record from the database using an existing session.

Does not automatically commit the session. Supports synchronous and asynchronous sessions.

Parameters:

Name Type Description Default
session DBSession

The session to use for deletion.

required
record Result

The record to be deleted.

required
Source code in auto_rest/queries.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
async def delete_session_record(session: DBSession, record: Result) -> None:
    """Delete a record from the database using an existing session.

    Does not automatically commit the session.
    Supports synchronous and asynchronous sessions.

    Args:
        session: The session to use for deletion.
        record: The record to be deleted.
    """

    logger.debug("Deleting record.")
    if isinstance(session, AsyncSession):
        await session.delete(record)

    else:
        session.delete(record)

execute_session_query(session, query) async

Execute a query in the given session and return the result.

Supports synchronous and asynchronous sessions.

Parameters:

Name Type Description Default
session DBSession

The SQLAlchemy session to use for executing the query.

required
query Executable

The query to be executed.

required

Returns:

Type Description
Result

The result of the executed query.

Source code in auto_rest/queries.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
async def execute_session_query(session: DBSession, query: Executable) -> Result:
    """Execute a query in the given session and return the result.

    Supports synchronous and asynchronous sessions.

    Args:
        session: The SQLAlchemy session to use for executing the query.
        query: The query to be executed.

    Returns:
        The result of the executed query.
    """

    logger.debug(str(query).replace("\n", " "))
    if isinstance(session, AsyncSession):
        return await session.execute(query)

    return session.execute(query)

get_record_or_404(result)

Retrieve a scalar record from a query result or raise a 404 error.

Parameters:

Name Type Description Default
result Result

The query result to extract the scalar record from.

required

Returns:

Type Description
any

The scalar record if it exists.

Raises:

Type Description
HTTPException

If the record is not found.

Source code in auto_rest/queries.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def get_record_or_404(result: Result) -> any:
    """Retrieve a scalar record from a query result or raise a 404 error.

    Args:
        result: The query result to extract the scalar record from.

    Returns:
        The scalar record if it exists.

    Raises:
        HTTPException: If the record is not found.
    """

    if record := result.fetchone():
        return record

    raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Record not found")