def do_db(self):
        engine = create_engine("mysql+pymysql://root:" + self.passwd + "@" + self.host + ":" + self.port + "/" + self.db)
        sql = "select lot_no as '批号' from ZM_TBL_DOMESTIC_GINNED_COTTON where DELETE_FLAG = '0'"
        self.df1 = pd.read_sql_query(sql, engine)
        print(self.df1)
  • 读取EXCEL
  •     def do_excel(self):
            self.df2 = pd.read_excel(self.file_name, usecols=[0], sheet_name="Sheet1", keep_default_na=False, converters={'批号': str})
            self.df2 = self.df2.drop_duplicates() 
            print(self.df2)
    

    PS:这里需要注意的是,使用datacompy比较的两个列表中不能又重复的数据,所以要使用self.df2.drop_duplicates()去重

  • 比较列表,并将差异存入EXCEL
  •     def dict_compare(self):
            self.do_db()
            self.do_excel()
            compare = datacompy.Compare(self.df1, self.df2, join_columns=['批号'])
            # print(compare.matches())  # 最后判断是否相等,返回 bool
            # print(compare.report())  # 打印报告详情,返回 string
            # print(compare.report(sample_count=5000))  # 打印报告详情,返回 string
            df1_unq_rows = compare.df1_unq_rows
            df2_unq_rows = compare.df2_unq_rows
            writer = pd.ExcelWriter(self.file_name, engine='openpyxl')
            writer.book = load_workbook(self.file_name)
            df1_unq_rows.to_excel(writer, sheet_name='EXCEL缺少的数据')
            df2_unq_rows.to_excel(writer, sheet_name="DB缺少的数据")
            writer.save()
            writer.close()
    

    查看datacompy文档

    #!/usr/bin/python3
    # -*- encoding: utf-8 -*-
    @File        :检查.py
    @Time        :2020/10/26 10:39:06
    @Author      :He
    @Software    :vsCode
    import pymysql
    import time
    import datetime
    import uuid
    import os
    from sqlalchemy import create_engine
    import pandas as pd
    import datacompy
    from openpyxl import load_workbook
    class mysql_class:
        def __init__(self):
            self.host = 'IP'
            self.port = '端口'
            self.passwd = '密码'
            self.user = 'root'
            self.db = ''
            self.file_name = 'EXCEL.xlsx'
        def do_db(self):
            engine = create_engine("mysql+pymysql://root:" + self.passwd + "@" + self.host + ":" + self.port + "/" + self.db)
            sql = "select lot_no as '批号' from ZM_TBL_DOMESTIC_GINNED_COTTON where DELETE_FLAG = '0'"
            self.df1 = pd.read_sql_query(sql, engine)
            print(self.df1)
        def do_excel(self):
            self.df2 = pd.read_excel(self.file_name, usecols=[0], sheet_name="Sheet1", keep_default_na=False, converters={'批号': str})
            self.df2 = self.df2.drop_duplicates()
            print(self.df2)
        def getCurrentTime(self):
            return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
        def dict_compare(self):
            self.do_db()
            self.do_excel()
            compare = datacompy.Compare(self.df1, self.df2, join_columns=['批号'])
            df1_unq_rows = compare.df1_unq_rows
            df2_unq_rows = compare.df2_unq_rows
            writer = pd.ExcelWriter(self.file_name, engine='openpyxl')
            writer.book = load_workbook(self.file_name)
            df1_unq_rows.to_excel(writer, sheet_name='EXCEL缺少的数据')
            df2_unq_rows.to_excel(writer, sheet_name="DB缺少的数据")
            writer.save()
            writer.close()
    if __name__ == "__main__":
        os.chdir(os.path.abspath(os.path.dirname(__file__)))
        starttime = datetime.datetime.now()
        print(starttime)
        mysql_class = mysql_class()
        mysql_class.dict_compare()
        endtime = datetime.datetime.now()
        print(endtime)
        print('\n数据处理成功!所用时间为:' + str((endtime - starttime).seconds))