# -*- coding: utf-8 -*-"""This module provide utility functions for select operation."""fromtypingimportList,Union,Iterablefromsqlalchemyimportselect,func,Column,Table,and_fromsqlalchemy.engineimportEngine,Result,Rowfrom..utilsimportensure_exact_one_arg_is_not_none
[docs]defcount_row(engine:Engine,table:Table,)->int:""" Return number of rows in a table. Example:: import sqlalchemy as sa import sqlalchemy_mate as sam t_users = sa.Table(...) engine = sa.create_engine(...) sam.selecting.count_row(engine, t_user) **中文文档** 返回一个表中的行数。 """withengine.connect()asconnection:returnconnection.execute(select([func.count()]).select_from(table)).fetchone()[0]
[docs]defby_pk(engine:Engine,table:Table,id_,)->Union[Row,None]:""" Return single row or None by primary key values. :param id_: single value if table has only one primary key, tuple / list of values if table has multiple primary keys, positioning is sensitive. Example:: import sqlalchemy as sa import sqlalchemy_mate as sam t_users = sa.Table "users", metadata, Column("id", sa.Integer, primary_key=True), ... ) engine = sa.create_engine(...) row = sam.selecting.by_pk(engine, t_user, 1) # one row or None print(row._fields) # keys print(tuple(row)) # values print(row._asdict()) # dict view """withengine.connect()asconnection:ifisinstance(id_,(tuple,list)):iflen(id_)!=len(table.primary_key):raiseValueErrorwhere_args=list()forcolumn,valueinzip(table.primary_key,id_):where_args.append(column==value)returnconnection.execute(select(table).where(and_(*where_args))).fetchone()else:iflen(table.primary_key)!=1:raiseValueErrorreturnconnection.execute(select(table).where(list(table.primary_key)[0]==id_)).fetchone()
[docs]defselect_all(engine:Engine,table:Table,)->Result:""" Select all rows from a table. Example:: for row in sam.selecting.select_all(engine, t_users): ... """s=select([table])withengine.connect()asconnection:returnconnection.execute(s)
[docs]defselect_single_column(engine:Engine,column:Column,)->list:""" Select data from single column. Example:: id_list = sam.selecting.select_all(engine, t_users.c.id) """s=select([column])withengine.connect()asconnection:return[row[0]forrowinconnection.execute(s)]
[docs]defselect_many_column(engine:Engine,columns:List[Column],)->List[tuple]:""" Select data from multiple columns. Example:: dataframe = sam.selecting.select_all(engine, [t_users.c.id, t_users.c.name]) """s=select(columns)withengine.connect()asconnection:return[tuple(row)forrowinconnection.execute(s)]
[docs]defselect_single_distinct(engine:Engine,column:Column,)->list:""" Select distinct data from single column. Example:: unique_name_list = sam.selecting.select_all(engine, t_users.c.name) """s=select([column]).distinct()withengine.connect()asconnection:return[row[0]forrowinconnection.execute(s)]
[docs]defselect_many_distinct(engine:Engine,columns:List[Column],)->List[tuple]:""" Select distinct data from multiple columns. Example:: dataframe = sam.selecting.select_many_distinct(engine, [t_users.c.id, t_users.c.name]) """s=select(columns).distinct()withengine.connect()asconnection:return[tuple(row)forrowinconnection.execute(s)]
[docs]defselect_random(engine:Engine,table:Table=None,columns:List[Column]=None,limit:int=None,perc:int=None)->Result:""" Randomly select some rows from table. :param perc: int from 1 ~ 99. (means 1% ~ 99%) Example:: # randomly select 100 users for row in sam.selecting.select_random(engine, t_users, limit=100): ... # randomly select 5% rows from users table for row in sam.selecting.select_random(engine, t_users, perc=5): ... """ensure_exact_one_arg_is_not_none(limit,perc)ensure_exact_one_arg_is_not_none(table,columns)iftableisnotNone:iflimitisnotNone:stmt=select(table).order_by(func.random()).limit(limit)else:ifperc>=100orperc<=0:raiseValueErrorselectable=table.tablesample(func.bernoulli(perc),name="alias",seed=func.random())args=[getattr(selectable.c,column.name)forcolumnintable.columns]stmt=select(*args)elifcolumnsisnotNone:iflimitisnotNone:stmt=select(columns).order_by(func.random()).limit(limit)else:ifperc>=100orperc<=0:raiseValueErrorselectable=columns[0].table.tablesample(func.bernoulli(perc),name="alias",seed=func.random())args=[getattr(selectable.c,column.name)forcolumnincolumns]stmt=select(*args)else:# pragma: no cover, for readability onlyraiseNotImplementedErrorwithengine.connect()asconnection:returnconnection.execute(stmt)
[docs]defyield_tuple(result:Result)->Iterable[tuple]:""" Yield rows in tuple values view. """forrowinresult:yieldtuple(row)
[docs]defyield_dict(result:Result)->Iterable[dict]:""" Yield rows in dict view. """forrowinresult:yielddict(row)