Python 连接 MySQL 处理 Excel 报表(一键备份和查询)
自从学了Python和MySQL,再也不用为处理Exce报表而挠头了。下面请看笔者是如何处理 Excel报表的。
一、Excel 文件
F:/public/玉米报表.xls

内容如下图:成品入库表

成品出库表

二、需求
1、一键备份 Excel 数据到 MySQL 数据库
2、一键查询库存报表
三、使用展示
1、点击Run,一键运行

2、登录
选择第1项登录,没有用户名的,可以根据提示进行注册,具体注册方法,这里不再展示。

3、第1项查看数据库,第2项选择使用数据库,第3项创建数据库及表

4、选择使用的数据库,这里笔者选择 demdb。

5、选择第1项导入Excel 报表,可以重复导入,以更新数据表。

6、选择第5项,可以查询入库情况

7、选择第6项,可以查询出库情况

8、选择第7项,可以查询库存情况

四、代码
def import_data(user, password, db_name):
from sqlalchemy import create_engine
# import pandas as pd
try:
engine = create_engine(f"mysql+mysqlconnector://{user}:{password}@127.0.0.1:3306/{db_name}", echo=False)
df1 = pd.read_excel("F:/public/玉米报表.xls", sheet_name="大包装入库")
df2 = pd.read_excel("F:/public/玉米报表.xls", sheet_name="小包装入库")
df3 = pd.read_excel("F:/public/玉米报表.xls", sheet_name="半成品入库")
df4 = pd.read_excel("F:/public/玉米报表.xls", sheet_name="成品入库")
df5 = pd.read_excel("F:/public/玉米报表.xls", sheet_name="成品出库")
print("read excel data successfully ! ")
# engine=create_engine("postgresql+psycopg2://wyj:wyj@127.0.0.1:5432/corn")
# engine = create_engine("mysql+mysqlconnector://wyj:wyj@127.0.0.1:3306/corn", echo=False)
# sql1 = "truncate table `大包装入库`;"
# sql2 = "truncate table `小包装入库`;"
# sql3 = "truncate table `半成品入库`;"
# sql4 = "truncate table `成品入库`;"
# sql5 = "truncate table `成品出库`;"
sql1 = "drop table if exists `big_bag_in`;"
sql2 = "drop table if exists `small_bag_in`;"
sql3 = "drop table if exists `semi_in`;"
sql4 = "drop table if exists `finished_in`;"
sql5 = "drop table if exists `finished_out`;"
with engine.connect() as conn:
conn.execute(sql1)
conn.execute(sql2)
conn.execute(sql3)
conn.execute(sql4)
conn.execute(sql5)
print("executed successfully !")
df1.to_sql(name="big_bag_in", con=engine, index=False, if_exists="replace")
df2.to_sql(name="small_bag_in", con=engine, index=False, if_exists="replace")
df3.to_sql(name="semi_in", con=engine, index=False, if_exists="replace")
df4.to_sql(name="finished_in", con=engine, index=False, if_exists="replace")
df5.to_sql(name="finished_out", con=engine, index=False, if_exists="replace")
print("imported successfully !")
except Exception as e:
print(e)
def query_data(user, password, db_name, tb_name):
from sqlalchemy import create_engine
# import pandas as pd
try:
engine = create_engine(f"mysql+mysqlconnector://{user}:{password}@127.0.0.1:3306/{db_name}", echo=False)
sql = f"select * from {tb_name} limit 5;"
df = pd.read_sql(sql, engine)
print("""
查询结果
""")
print("-------------------------------------------------")
print(df)
print("-------------------------------------------------")
except Exception as e:
print(e)
def delete_data(user, password, db_name, tb_name, tb_id, product_name):
from sqlalchemy import create_engine
try:
engine = create_engine(f"mysql+mysqlconnector://{user}:{password}@127.0.0.1:3306", echo=False)
with engine.connect() as conn:
conn.execute(f"use {db_name}")
print(f"{db_name} selected !")
sql1 = "show tables;"
result = conn.execute(sql1).fetchall()
print(result)
sql2 = f"delete from {tb_name} where sht_id = %s and product=%s"
paras = (tb_id, product_name)
conn.execute(sql2, paras)
print("delete successfully !")
except Exception as e:
print(e)
def update(user, password, db_name, tb_name, tb_id, product_name, new_qty1, new_qty2):
from sqlalchemy import create_engine
try:
engine = create_engine(f"mysql+mysqlconnector://{user}:{password}@127.0.0.1:3306", echo=False)
with engine.connect() as conn:
conn.execute(f"use {db_name}")
print(f"{db_name} selected !")
sql1 = "show tables;"
result = conn.execute(sql1).fetchall()
print(result)
sql2 = f"update {tb_name} set 发货=%s,实销=%s where sht_id = %s and product=%s"
paras = (new_qty1, new_qty2, tb_id, product_name)
conn.execute(sql2, paras)
print("updated successfully !")
except Exception as e:
print(e)
def stockIn_list(user, password, db_name, tb_in):
from sqlalchemy import create_engine
# import pandas as pd
try:
engine = create_engine(f"mysql+mysqlconnector://{user}:{password}@127.0.0.1:3306/{db_name}", echo=False)
sql = f"select product,small_bag,小袋数量 as 入库 from {tb_in} where product is not null;"
df = pd.read_sql(sql, engine)
print("""
入库明细
""")
print("--------------------------------------------")
print(df)
print("--------------------------------------------")
except Exception as e:
print(e)
def stockOut_list(user, password, db_name, tb_out):
from sqlalchemy import create_engine
# import pandas as pd
try:
engine = create_engine(f"mysql+mysqlconnector://{user}:{password}@127.0.0.1:3306/{db_name}", echo=False)
sql = f"select customer,product,包装类型,发货 as 出库 from {tb_out};"
df = pd.read_sql(sql, engine)
print("""
出库明细
""")
print("--------------------------------------------")
print(df)
print("--------------------------------------------")
except Exception as e:
print(e)
def inventory(user, password, db_name, tb_in, tb_out):
from sqlalchemy import create_engine
# import pandas as pd
pd.set_option('display.unicode.ambiguous_as_wide', True)
pd.set_option('display.unicode.east_asian_width', True)
pd.set_option('expand_frame_repr', False)
pd.set_option('display.max_columns', None, 'display.max_rows', None)
pd.set_option('display.max_rows', 5000)
try:
engine = create_engine(f"mysql+mysqlconnector://{user}:{password}@127.0.0.1:3306/{db_name}", echo=False)
sql1 = "select product,description,小袋数量 as 入库 from %s where product is not null;" % tb_in
sql2 = "select product,description,发货 as 出库,其他出库 from %s;" % tb_out
df1 = pd.read_sql(sql1, engine)
df2 = pd.read_sql(sql2, engine)
df1.fillna(0, inplace=True)
df2.fillna(0, inplace=True)
print("""
入库明细
""")
print("--------------------------------------------")
print(df1)
print("--------------------------------------------")
print("""
出库明细
""")
print("--------------------------------------------")
print(df2)
print("--------------------------------------------")
df1 = df1.groupby(['product', 'description'])[['入库']].sum()
df2 = df2.groupby(['product', 'description'])[['出库', '其他出库']].sum()
df = df1.merge(df2, how='left', left_on=['product', 'description'], right_on=['product', 'description'])
df.fillna(0, inplace=True)
df['库存'] = df['入库'] - df['出库'] - df['其他出库']
df = pd.pivot_table(df, index=['product', 'description'], values=['入库', '出库', '其他出库', '库存'], aggfunc=sum,
margins=True)
print("""
库存汇总
""")
print("------------------------------------------------------------")
print(df)
print("------------------------------------------------------------")
print('Finished !')
except Exception as e:
print(e)
def show_databases(user, password):
from sqlalchemy import create_engine
# import pandas as pd
try:
engine = create_engine(f"mysql+mysqlconnector://{user}:{password}@127.0.0.1:3306", echo=False)
sql = "show databases;"
df = pd.read_sql(sql, engine)
print(df)
except Exception as e:
print(e)
def use_database(user, password, db_name):
from sqlalchemy import create_engine
try:
engine = create_engine(f"mysql+mysqlconnector://{user}:{password}@127.0.0.1:3306", echo=False)
with engine.connect() as conn:
conn.execute(f"use {db_name}")
print(f"{db_name} selected !")
sql = "show tables;"
result = conn.execute(sql).fetchall()
print("table list:")
print("-------------")
for re in result:
print(re[0])
print("-------------")
except Exception as e:
print(e)
def create_db(user, password, db_name, tb1_name, tb2_name):
from sqlalchemy import create_engine
try:
engine = create_engine(f"mysql+mysqlconnector://{user}:{password}@127.0.0.1:3306", echo=False)
sql1 = f"create database if not exists {db_name};"
with engine.connect() as conn:
conn.execute(sql1)
conn.execute("commit;")
print(f"{db_name} created successfully !")
with engine.connect() as conn:
conn.execute(f"use {db_name}")
conn.execute("commit;")
sql2 = f"""create table if not exists {tb1_name}(
product_id int auto_increment primary key,
product_name varchar(25) not null ,
description varchar(50),
price decimal(4,2),
qty int not null default 0,
ctime datetime default current_timestamp on update current_timestamp,
utime datetime default current_timestamp on update current_timestamp
) engine=InnoDB default charset=utf8;
"""
sql3 = f"""create table if not exists {tb2_name}(
product_id int auto_increment primary key,
product_name varchar(25) not null ,
description varchar(50),
price decimal(4,2),
qty int not null default 0,
ctime datetime default current_timestamp on update current_timestamp,
utime datetime default current_timestamp on update current_timestamp
) engine=InnoDB default charset=utf8;
"""
with engine.connect() as conn:
conn.execute(sql2)
conn.execute("commit;")
print(f"{tb1_name} created successfully !")
conn.execute(sql3)
print(f"{tb2_name} created successfully !")
except Exception as e:
print(e)
def main():
# import pandas as pd
import login_func as login
import getpass
pd.set_option('display.unicode.ambiguous_as_wide', True)
pd.set_option('display.unicode.east_asian_width', True)
pd.set_option('expand_frame_repr', False)
pd.set_option('display.max_columns', None, 'display.max_rows', None)
pd.set_option('display.max_rows', 5000)
msg1 = """
登录界面
--------------------------------------------
0. exit/quit
1. login
2. register
3. insert employee info
--------------------------------------------
"""
msg2 = """
欢迎进入商品进销存管理系统 2.0 版
------------------------------------------------
0. quit
1. show databases
2. use database
3. create new database and new table
-------------------------------------------------
"""
while True:
print(msg1)
choice = input('please input your choice: ')
if choice == 'q' or choice == 'Q' or choice == '0':
print('quit !')
break
elif choice == '1':
user = input("please input user name: ")
password = getpass.getpass("enter password:")
# password = input("please input password: ")
is_login = login.login(user, password)
if not is_login:
print("用户名或密码错误!")
else:
print(f"{user} login successfully!")
while True:
print(msg2)
handle = input('请选择操作项目: ')
if handle == 'q' or handle == 'Q' or handle == '0':
print('quit !')
break
elif handle == '1':
user = 'wyj'
password = 'wyj'
show_databases(user, password)
elif handle == '2':
user = 'wyj'
password = 'wyj'
db_name = input("please select database name: ")
use_database(user, password, db_name)
msg3 = """
商品管理系统 2.0 版
--------------------------------
0. exit
1. 导入 Excel 数据表
2. 查询
3. 删除数据
4. 修改数据
5. 入库查询
6. 出库查询
7. 库存查询
--------------------------------
"""
while True:
print(msg3)
num = input('请选择操作项目编号: ')
if num == 'q' or num == 'Q' or num == '0':
print('退出系统 !')
break
elif num == '1':
import_data(user, password, db_name)
elif num == '2':
print("----------query table >tb_name = input("table name: ")
query_data(user, password, db_name, tb_name)
elif num == '3':
print("----------delete table >tb_name = input("table name: ")
tb_id = input("table id: ")
product_name = input("input product name: ")
delete_data(user, password, db_name, tb_name, tb_id, product_name)
elif num == '4':
print("----------update table >tb_name = input("please input table name: ")
tb_id = input("please input table id: ")
product_name = input("please input product name: ")
new_qty1 = input("please input new qty1: ")
new_qty2 = input("please input new qty2: ")
update(user, password, db_name, tb_name, tb_id, product_name, new_qty1, new_qty2)
elif num == '5':
tb_in = "finished_in"
stockIn_list(user, password, db_name, tb_in)
elif num == '6':
tb_out = "finished_out"
stockOut_list(user, password, db_name, tb_out)
elif num == '7':
tb_in = "finished_in"
tb_out = "finished_out"
inventory(user, password, db_name, tb_in, tb_out)
else:
print('input error!,please input again !')
elif handle == '3':
user = 'wyj'
password = 'wyj'
db_name = input("please input new database name: ")
tb1_name = input("please input new table1 name: ")
tb2_name = input("please input new table2 name: ")
create_db(user, password, db_name, tb1_name, tb2_name)
break
else:
print('Error! please input again !')
elif choice == '2':
register_name = input("please input register name: ")
employee_id = input("please input employee id : ")
is_user_exists = login.user_exists(register_name)
is_employee_exists = login.employee_exists(employee_id)
if is_user_exists:
print("user exists ,can not register,please change another user name.")
elif not is_employee_exists:
print('not employee , can not register.')
else:
pwd1 = input('please input your password: ')
pwd2 = input('please confirm your password: ')
if pwd1 == pwd2:
login.register(register_name, pwd1)
print("ok,registered successfully!")
elif choice == '3':
employee_id = input("please input employee id: ")
employee_name = input("please input employee name: ")
login.insert_employee_info(employee_name, employee_id)
else:
print('Error! please input again !')
if __name__ == '__main__':
import pandas as pd
pd.set_option('display.unicode.ambiguous_as_wide', True)
pd.set_option('display.unicode.east_asian_width', True)
pd.set_option('expand_frame_repr', False)
pd.set_option('display.max_columns', None, 'display.max_rows', None)
pd.set_option('display.max_rows', 5000)
main()
好了,还不赶快去试一下。