# -*- coding: utf-8 -*-"""This module provide utility functions for update operation."""fromtypingimportUnion,List,Tuple,Dict,AnyfromcollectionsimportOrderedDictfromsqlalchemyimportand_fromsqlalchemyimportTablefromsqlalchemy.engineimportEnginefrom..utilsimportensure_list
[docs]defupdate_all(engine:Engine,table:Table,data:Union[Dict[str,Any],List[Dict[str,Any]]],upsert=False,)->Tuple[int,int]:""" Update data by its primary_key column values. By default upsert is False. """withengine.connect()asconnection:update_counter=0insert_counter=0data=ensure_list(data)ins=table.insert()upd=table.update()# Find all primary key columnspk_cols=OrderedDict()forcolumnintable._columns:ifcolumn.primary_key:pk_cols[column.name]=columndata_to_insert=list()# Multiple primary key columniflen(pk_cols)>=2:forrowindata:result=engine.execute(upd.where(and_(*[col==row[name]forname,colinpk_cols.items()])).values(**row))ifresult.rowcount==0:data_to_insert.append(row)else:update_counter+=1# Single primary key columneliflen(pk_cols)==1:forrowindata:result=engine.execute(upd.where([col==row[name]forname,colinpk_cols.items()][0]).values(**row))ifresult.rowcount==0:data_to_insert.append(row)else:update_counter+=1else:# pragma: no coverdata_to_insert=data# Insert rest of dataifupsert:iflen(data_to_insert):engine.execute(ins,data_to_insert)insert_counter+=len(data_to_insert)returnupdate_counter,insert_counter
[docs]defupsert_all(engine:Engine,table:Table,data:Union[Dict[str,Any],List[Dict[str,Any]]],)->Tuple[int,int]:""" Update data by primary key columns. If not able to update, do insert. Example:: # define data model t_user = Table( "users", metadata, Column("id", Integer, primary_key=True), Column("name", String), ) # suppose in database we already have {"id": 1, "name": "Alice"} data = [ {"id": 1, "name": "Bob"}, # this will be updated {"id": 2, "name": "Cathy"}, # this will be added ] update_count, insert_count = upsert_all(engine, t_user, data) print(update_count) # number of row updated counter print(insert_count) # number of row inserted counter # will return: [{"id": 1, "name": "Bob"}, {"id": 2, "name": "Cathy"}] with engine.connect() as connection: print(connection.execute(select([table_user])).all()) **中文文档** 批量更新文档. 如果该表格定义了Primary Key, 则用Primary Key约束where语句. 对于 where语句无法找到的行, 自动进行批量 bulk insert. """returnupdate_all(engine=engine,table=table,data=data,upsert=True)