# -*- coding: utf-8 -*-
"""
Database data/Local File I/O module.
"""
import os
import sqlalchemy as sa
[docs]def sql_to_csv(
stmt,
engine: sa.Engine,
filepath: str,
chunksize: int = 1000,
overwrite: bool = False,
):
"""
Export sql stmt result to csv file.
:param stmt: :class:`sqlalchemy.sql.selectable.Select` instance.
:param engine: :class:`sqlalchemy.engine.base.Engine`.
:param filepath: file path.
:param chunksize: number of rows write to csv each time.
:param overwrite: bool, if True, avoid to overite existing file.
**中文文档**
将执行sql的结果中的所有数据, 以生成器的方式(一次只使用一小部分内存), 将
整个结果写入csv文件。
"""
if overwrite: # pragma: no cover
if os.path.exists(filepath):
raise Exception("'%s' already exists!" % filepath)
import pandas as pd
columns = [str(column.name) for column in stmt.selected_columns]
with open(filepath, "w") as f:
# write header
df = pd.DataFrame([], columns=columns)
df.to_csv(f, header=True, index=False)
# iterate big database table
with engine.connect() as connection:
result_proxy = connection.execute(stmt)
while True:
data = result_proxy.fetchmany(chunksize)
if len(data) == 0:
break
else:
df = pd.DataFrame(data, columns=columns)
df.to_csv(f, header=False, index=False)
[docs]def table_to_csv(
table: sa.Table,
engine: sa.Engine,
filepath,
chunksize: int = 1000,
overwrite: bool = False,
):
"""
Export entire table to a csv file.
:param table: :class:`sqlalchemy.Table` instance.
:param engine: :class:`sqlalchemy.engine.base.Engine`.
:param filepath: file path.
:param chunksize: number of rows write to csv each time.
:param overwrite: bool, if True, avoid to overite existing file.
**中文文档**
将整个表中的所有数据, 写入csv文件。
"""
sql = sa.select(table)
sql_to_csv(sql, engine, filepath, chunksize)