Source code for sqlalchemy_mate.pt

# -*- coding: utf-8 -*-

"""
Pretty Table support. Allow you quickly print query result in ascii table.

:func:`from_everything`

**中文文档**

因为 PrettyTable 只能通过 from_db_cursor 来创建, 也就是说要求返回
ResultProxy, 而不是 ORM 对象的列表. 所以在 :func:`from_object`, :func:`from_query`
这些使用 ``session.query(Entity)`` 的函数中, 我们需要用特殊技巧, 让 session.query
最终返回 ResultProxy.
"""

import typing as T

import sqlalchemy as sa
import sqlalchemy.orm as orm

from prettytable import PrettyTable

from .utils import ensure_session, clean_session


def get_keys_values(
    item,
) -> T.Tuple[T.Union[list, tuple], T.Union[list, tuple]]:
    if isinstance(item.__class__, orm.DeclarativeMeta):
        keys, values = list(), list()
        for column in item.__class__.__table__.columns:
            keys.append(column.name)
            values.append(getattr(item, column.name))
        return keys, values
    elif isinstance(item, sa.Row):
        return list(item._fields), list(item)
    else:  # pragma: no cover
        raise TypeError


def from_result(result: T.Union[sa.Result, sa.ScalarResult]):
    pt = PrettyTable()
    try:
        first_item = next(result)
        keys, values = get_keys_values(first_item)
        pt.field_names = keys
        pt.add_row(values)
    except StopIteration:
        return pt

    if isinstance(result, sa.ScalarResult):
        for obj in result:
            pt.add_row([getattr(obj, key) for key in keys])
    elif isinstance(result, sa.Result):
        for row in result:
            pt.add_row(list(row))
    else:
        raise Exception

    return pt


[docs]def from_text_clause(t: sa.TextClause, engine: sa.Engine, **kwargs) -> PrettyTable: """ Execute a query in form of texture clause, return the result in form of :class:`PrettyTable`. """ with engine.connect() as connection: result = connection.execute(t, **kwargs) return from_result(result)
[docs]def from_stmt(stmt: sa.Select, engine: sa.Engine, **kwargs) -> PrettyTable: """ Create a :class:`prettytable.PrettyTable` from :class:`sqlalchemy.select`. :param sql: a ``sqlalchemy.sql.selectable.Select`` object. :param engine: an ``sqlalchemy.engine.base.Engine`` object. **中文文档** 将 sqlalchemy 的 sql expression query 结果放入 prettytable 中. """ with engine.connect() as connection: result = connection.execute(stmt, **kwargs) return from_result(result)
[docs]def from_table( table: sa.Table, engine: sa.Engine, limit: int = None, **kwargs ) -> PrettyTable: """ Select data in a database table and put into prettytable. Create a :class:`prettytable.PrettyTable` from :class:`sqlalchemy.Table`. :param table: a ``sqlalchemy.sql.schema.Table`` object :param engine: the engine or session used to execute the query. :param limit: int, limit rows to return. **中文文档** 将数据表中的数据放入 PrettyTable 中. """ stmt = sa.select(table) if limit is not None: stmt = stmt.limit(limit) with engine.connect() as connection: result = connection.execute(stmt, **kwargs) return from_result(result)
[docs]def from_model( orm_class, engine_or_session: T.Union[sa.Engine, orm.Session], limit: int = None, **kwargs ): """ Select data from the table defined by a ORM class, and put into prettytable :param orm_class: an orm class inherit from ``sqlalchemy.ext.declarative.declarative_base()`` :param engine_or_session: the engine or session used to execute the query. :param limit: int, limit rows to return. **中文文档** 将数据对象的数据放入 PrettyTable 中. 常用于快速预览某个对象背后的数据. """ ses, auto_close = ensure_session(engine_or_session) stmt = sa.select(orm_class.__table__) if limit is not None: stmt = stmt.limit(limit) result = ses.execute(stmt, **kwargs) tb = from_result(result) clean_session(ses, auto_close) return tb
[docs]def from_dict_list(data: T.List[dict]) -> PrettyTable: """ Construct a Prettytable from list of dictionary. """ tb = PrettyTable() if len(data) == 0: # pragma: no cover return tb else: tb.field_names = list(data[0].keys()) for row in data: tb.add_row(list(row.values())) return tb
[docs]def from_everything( everything: T.Union[ sa.TextClause, sa.Select, sa.Table, T.List[dict], orm.DeclarativeMeta, ], engine_or_session: T.Union[sa.Engine, orm.Session] = None, limit: int = None, **kwargs ): """ Construct a Prettytable from any kinds of sqlalchemy query. :type engine_or_session: Union[Engine, Session] :rtype: PrettyTable Usage:: from sqlalchemy import select sql = select(t_user) print(from_everything(sql, engine)) query = session.query(User) print(from_everything(query, session)) session.query(User) """ if isinstance(everything, sa.TextClause): return from_text_clause(everything, engine_or_session, **kwargs) if isinstance(everything, str): return from_text_clause(sa.text(everything), engine_or_session, **kwargs) if isinstance(everything, sa.Select): return from_stmt(everything, engine_or_session, **kwargs) if isinstance(everything, sa.Table): return from_table(everything, engine_or_session, limit=limit, **kwargs) if isinstance(everything, orm.DeclarativeMeta): return from_model(everything, engine_or_session, limit=limit, **kwargs) if isinstance(everything, sa.Result): return from_result(everything) if isinstance(everything, list): return from_dict_list(everything) raise Exception