Skip to content

auto_rest.queries

The queries module provides asynchronous wrapper functions around operations involving SQLAlchemy sessions. These utilities automatically account for variations in behavior between synchronous and asynchronous session types (i.e., Session and AsyncSession instances). This ensures consistent query handling and provides a streamlined interface for database interactions.

Example: Query Execution

Query utilities seamlessly support synchronous and asynchronous session types.

query = select(SomeTable).where(SomeTable.id == item_id)

with Session(...) as sync_session:
    result = await execute_session_query(sync_session, query)

with AsyncSession(...) as async_session:
    result = await execute_session_query(async_session, query)

apply_ordering_params(query, order_by=None, direction='asc')

Apply ordering to a database query.

Returns a copy of the provided query with ordering parameters applied.

Parameters:

Name Type Description Default
query Select

The database query to apply parameters to.

required
order_by str | None

The name of the column to order by.

None
direction Literal['desc', 'asc']

The direction to order by (defaults to "asc").

'asc'

Returns:

Type Description
Select

A copy of the query modified to return ordered values.

Source code in auto_rest/queries.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def apply_ordering_params(
    query: Select,
    order_by: str | None = None,
    direction: Literal["desc", "asc"] = "asc"
) -> Select:
    """Apply ordering to a database query.

    Returns a copy of the provided query with ordering parameters applied.

    Args:
        query: The database query to apply parameters to.
        order_by: The name of the column to order by.
        direction: The direction to order by (defaults to "asc").

    Returns:
        A copy of the query modified to return ordered values.
    """

    if order_by is None:
        return query

    if order_by not in query.columns:
        raise ValueError(f"Invalid column name: {order_by}")

    # Default to ascending order for an invalid ordering direction
    if direction == "desc":
        return query.order_by(desc(order_by))

    elif direction == "asc":
        return query.order_by(asc(order_by))

    raise ValueError(f"Invalid direction, use 'asc' or 'desc': {direction}")

apply_pagination_params(query, limit=0, offset=0)

Apply pagination to a database query.

Returns a copy of the provided query with offset and limit parameters applied.

Parameters:

Name Type Description Default
query Select

The database query to apply parameters to.

required
limit int

The number of results to return.

0
offset int

The offset to start with.

0

Returns:

Type Description
Select

A copy of the query modified to only return the paginated values.

Source code in auto_rest/queries.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def apply_pagination_params(query: Select, limit: int = 0, offset: int = 0) -> Select:
    """Apply pagination to a database query.

    Returns a copy of the provided query with offset and limit parameters applied.

    Args:
        query: The database query to apply parameters to.
        limit: The number of results to return.
        offset: The offset to start with.

    Returns:
        A copy of the query modified to only return the paginated values.
    """

    if offset < 0 or limit < 0:
        raise ValueError("Pagination parameters cannot be negative")

    # Do not apply pagination if not requested
    if limit == 0:
        return query

    return query.offset(offset or 0).limit(limit)

commit_session(session) async

Commit a SQLAlchemy session.

Supports synchronous and asynchronous sessions.

Parameters:

Name Type Description Default
session DBSession

The session to commit.

required
Source code in auto_rest/queries.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
async def commit_session(session: DBSession) -> None:
    """Commit a SQLAlchemy session.

    Supports synchronous and asynchronous sessions.

    Args:
        session: The session to commit.
    """

    if isinstance(session, AsyncSession):
        await session.commit()

    else:
        session.commit()

delete_session_record(session, record) async

Delete a record from the database using an existing session.

Does not automatically commit the session. Supports synchronous and asynchronous sessions.

Parameters:

Name Type Description Default
session DBSession

The session to use for deletion.

required
record Result

The record to be deleted.

required
Source code in auto_rest/queries.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
async def delete_session_record(session: DBSession, record: Result) -> None:
    """Delete a record from the database using an existing session.

    Does not automatically commit the session.
    Supports synchronous and asynchronous sessions.

    Args:
        session: The session to use for deletion.
        record: The record to be deleted.
    """

    if isinstance(session, AsyncSession):
        await session.delete(record)

    else:
        session.delete(record)

execute_session_query(session, query) async

Execute a query in the given session and return the result.

Supports synchronous and asynchronous sessions.

Parameters:

Name Type Description Default
session DBSession

The SQLAlchemy session to use for executing the query.

required
query Executable

The query to be executed.

required

Returns:

Type Description
Result

The result of the executed query.

Source code in auto_rest/queries.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
async def execute_session_query(session: DBSession, query: Executable) -> Result:
    """Execute a query in the given session and return the result.

    Supports synchronous and asynchronous sessions.

    Args:
        session: The SQLAlchemy session to use for executing the query.
        query: The query to be executed.

    Returns:
        The result of the executed query.
    """

    if isinstance(session, AsyncSession):
        return await session.execute(query)

    return session.execute(query)

get_record_or_404(result)

Retrieve a scalar record from a query result or raise a 404 error.

Parameters:

Name Type Description Default
result Result

The query result to extract the scalar record from.

required

Returns:

Type Description
any

The scalar record if it exists.

Raises:

Type Description
HTTPException

If the record is not found.

Source code in auto_rest/queries.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def get_record_or_404(result: Result) -> any:
    """Retrieve a scalar record from a query result or raise a 404 error.

    Args:
        result: The query result to extract the scalar record from.

    Returns:
        The scalar record if it exists.

    Raises:
        HTTPException: If the record is not found.
    """

    if record := result.fetchone():
        return record

    raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Record not found")