def do_db(self):
engine = create_engine("mysql+pymysql://root:" + self.passwd + "@" + self.host + ":" + self.port + "/" + self.db)
sql = "select lot_no as '批号' from ZM_TBL_DOMESTIC_GINNED_COTTON where DELETE_FLAG = '0'"
self.df1 = pd.read_sql_query(sql, engine)
print(self.df1)
读取EXCEL
def do_excel(self):
self.df2 = pd.read_excel(self.file_name, usecols=[0], sheet_name="Sheet1", keep_default_na=False, converters={'批号': str})
self.df2 = self.df2.drop_duplicates()
print(self.df2)
PS:这里需要注意的是,使用datacompy比较的两个列表中不能又重复的数据,所以要使用self.df2.drop_duplicates()
去重
比较列表,并将差异存入EXCEL
def dict_compare(self):
self.do_db()
self.do_excel()
compare = datacompy.Compare(self.df1, self.df2, join_columns=['批号'])
# print(compare.matches()) # 最后判断是否相等,返回 bool
# print(compare.report()) # 打印报告详情,返回 string
# print(compare.report(sample_count=5000)) # 打印报告详情,返回 string
df1_unq_rows = compare.df1_unq_rows
df2_unq_rows = compare.df2_unq_rows
writer = pd.ExcelWriter(self.file_name, engine='openpyxl')
writer.book = load_workbook(self.file_name)
df1_unq_rows.to_excel(writer, sheet_name='EXCEL缺少的数据')
df2_unq_rows.to_excel(writer, sheet_name="DB缺少的数据")
writer.save()
writer.close()
查看datacompy文档
#!/usr/bin/python3
# -*- encoding: utf-8 -*-
@File :检查.py
@Time :2020/10/26 10:39:06
@Author :He
@Software :vsCode
import pymysql
import time
import datetime
import uuid
import os
from sqlalchemy import create_engine
import pandas as pd
import datacompy
from openpyxl import load_workbook
class mysql_class:
def __init__(self):
self.host = 'IP'
self.port = '端口'
self.passwd = '密码'
self.user = 'root'
self.db = ''
self.file_name = 'EXCEL.xlsx'
def do_db(self):
engine = create_engine("mysql+pymysql://root:" + self.passwd + "@" + self.host + ":" + self.port + "/" + self.db)
sql = "select lot_no as '批号' from ZM_TBL_DOMESTIC_GINNED_COTTON where DELETE_FLAG = '0'"
self.df1 = pd.read_sql_query(sql, engine)
print(self.df1)
def do_excel(self):
self.df2 = pd.read_excel(self.file_name, usecols=[0], sheet_name="Sheet1", keep_default_na=False, converters={'批号': str})
self.df2 = self.df2.drop_duplicates()
print(self.df2)
def getCurrentTime(self):
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
def dict_compare(self):
self.do_db()
self.do_excel()
compare = datacompy.Compare(self.df1, self.df2, join_columns=['批号'])
df1_unq_rows = compare.df1_unq_rows
df2_unq_rows = compare.df2_unq_rows
writer = pd.ExcelWriter(self.file_name, engine='openpyxl')
writer.book = load_workbook(self.file_name)
df1_unq_rows.to_excel(writer, sheet_name='EXCEL缺少的数据')
df2_unq_rows.to_excel(writer, sheet_name="DB缺少的数据")
writer.save()
writer.close()
if __name__ == "__main__":
os.chdir(os.path.abspath(os.path.dirname(__file__)))
starttime = datetime.datetime.now()
print(starttime)
mysql_class = mysql_class()
mysql_class.dict_compare()
endtime = datetime.datetime.now()
print(endtime)
print('\n数据处理成功!所用时间为:' + str((endtime - starttime).seconds))