Source code for sqlalchemy_mate.crud.inserting

# -*- coding: utf-8 -*-

"""
This module provide utility functions for insert operation.
"""

import typing as T
import math

import sqlalchemy as sa
from sqlalchemy.exc import IntegrityError

from ..utils import grouper_list


[docs]def smart_insert( engine: sa.Engine, table: sa.Table, data: T.Union[dict, T.List[dict]], minimal_size: int = 5, _connection: sa.Connection = None, _op_counter: int = 0, _ins_counter: int = 0, _is_first_call: bool = True, ) -> T.Tuple[int, int]: """ An optimized Insert strategy. Guarantee successful and highest insertion speed. But ATOMIC WRITE IS NOT ENSURED IF THE PROGRAM IS INTERRUPTED. :return: number of successful INSERT sql execution; number of inserted rows. **中文文档** 在Insert中, 如果已经预知不会出现IntegrityError, 那么使用Bulk Insert的速度要 远远快于逐条Insert。而如果无法预知, 那么我们采用如下策略: 1. 尝试Bulk Insert, Bulk Insert由于在结束前不Commit, 所以速度很快。 2. 如果失败了, 那么对数据的条数开平方根, 进行分包, 然后对每个包重复该逻辑。 3. 若还是尝试失败, 则继续分包, 当分包的大小小于一定数量时, 则使用逐条插入。 直到成功为止。 该Insert策略在内存上需要额外的 sqrt(nbytes) 的开销, 跟原数据相比体积很小。 但时间上是各种情况下平均最优的。 """ if _connection is None: _connection = engine.connect() insert = table.insert() if isinstance(data, list): # 首先进行尝试 bulk insert try: _connection.execute(insert, data) _connection.commit() _op_counter += 1 _ins_counter += len(data) # 失败了 except IntegrityError: _connection.rollback() # 分析数据量 n = len(data) # 如果数据条数多于一定数量 if n >= minimal_size**2: # 则进行分包 n_chunk = math.floor(math.sqrt(n)) for chunk in grouper_list(data, n_chunk): _op_counter, _ins_counter = smart_insert( engine=engine, table=table, data=chunk, minimal_size=minimal_size, _connection=_connection, _op_counter=_op_counter, _ins_counter=_ins_counter, _is_first_call=False, ) # 否则则一条条地逐条插入 else: for row in data: try: _connection.execute(insert.values(**row)) _connection.commit() _op_counter += 1 _ins_counter += 1 except IntegrityError: _connection.rollback() else: try: _connection.execute(insert, data) _connection.commit() _op_counter += 1 _ins_counter += 1 except IntegrityError: _connection.rollback() if _is_first_call: _connection.close() return _op_counter, _ins_counter