Python数据分析(四)

现在上手一个数据分析实际案例:

从数据结果集本身入手,使用通用性检查分析数据+业务规则校验是否有异常,最终形成分析报告,若有问题及时做出人工干预

背景:数据最终到应用层面,会经过很多前面很多节点处理,包括数据清洗、数据转换、数据统计等等,这里面步骤有可能做了,也可能没有做,最终输出的结果在经过验证阶段,即使在验证那一刻保证了数据逻辑是正确的,也很难保证后续的每次跑数都不会出现问题。很多可能都会造成问题,包括数据贴源层数据的丢失,数据的延迟,调度的失败,依赖冲突,逻辑的误改动,后面到应用端节点数据接口问题等等,长期我们有关于数据处理链路可靠性保障方案,短期或者临时的话,也需要能短期能实现使用的方案,于是就有了这通过python来分析结果的一个案例

在已经有了python的编程知识了,结合Deepseek之后,输出分析案例的效率大大提高,我们现在差的也就是解决方案的思路了,具体的代码能力已经可以交给Deepseek来实现,而我要做的就是能对代码进行实际的调试使用,以达到最终目的

我开始对Deepseek的从前面进行一些背景的介绍和问题的引入后,最后实际编码前,问了Deepseek

好的,我清楚了,我现在要面临的业务场景是,结果集数据一年跑四次,频率极低,我打算在每次正式使用前几天就跑出这个结果,进行异常检查,然后生成分析报告,目前我能想到的,是对某些值进行一个值范围的校验,比如逾期率,只会在0%-100%,比如字段为空就是异常的校验,比如某个库存平均值,超出阈值水平的校验,比如现有的数据条目和历史上次数据的一个比较,异常的检出,我这边需要将这些规则通过python代码的方式进行处理,然后生成一份excel,第一个sheet有异常分析描述,后面的第二个sheet是记录了异常检查数据,请你根据我上方的描述,生成一份完整的python代码


Deepseek回复了个大体的框架,也提供了实际的代码后,我觉得还有必要再细化下我的规则,以及能提供更详细的模拟数据。于是我准备了两份数据,以及数据内各个字段的校验规则

对关键字段的校验规则,请按如下实现

group_customer_code
customer_code_id
customer_code
对应关系如下: customer_code_id为集团客户ID,与customer_code关系是一对多; 每个customer_code的指标数据除了credit_ccoefficient,其他指标值都会赋一样的值
对基础数据字段的检查,请给我分类抽象到方法
avg_sales_amount:主要是和历史数据比较,对比之前的数据高于10倍或者,低于10倍都是属于异常数据
avg_profit_amount:主要是和历史数据比较,对比之前的数据高于10倍或者,低于10倍都是属于异常数据
vol_turnover_rate:主要是和历史数据比较,对比之前的数据高于5倍或者,低于5倍都是属于异常数据
avg_overdue_ratio:有效值范围在0 - 1,其他是属于异常;在同一customer_star_rating,数值属于偏差太极端的
inbound_num:历史数据值 > 0, 当前数据变成0
为异常数据, customer_star_rating:当前数据比较历史数据值波动太大,比如从1跨多级到4、5、6
这种或者6级变成1级这种有异常;整体各个星级的占比情况波动太大,可能整体数据会有异常


Deepseek给出的代码后,我复制到我的编译器,调整了文件的获取地址,执行后,出了卡方检验报错问题,我觉得这并非必要,注释不解决了。后面代码运行成功,在这个基础上,改变了模拟数据,构造验证代码是否正确能覆盖校验场景的数据,一顿操作下来,全命中了校验场景。效果还是很不错了

从一开始的思路,到最终实现,出了个excel报告,中间也有些事情耽搁,大概花了2个小时,这效率还是蛮高的了。


最后附上代码

import pandas as pd
import numpy as np
from datetime import datetime
from scipy import stats


class DataValidator:
    def __init__(self, current_data, history_data=None):
        # 数据校验
        self._validate_input_data(current_data, history_data)

        self.current_df = current_data
        self.history_df = history_data
        self.anomalies = []

        # 预处理历史数据
        self.history_grouped = None
        if self._has_valid_history():
            self._prepare_history_data()

    def _validate_input_data(self, current, history):
        """校验输入数据完整性"""
        required_columns = {
            'customer_code_id', 'customer_star_rating',
            'avg_sales_amount', 'avg_profit_amount',
            'vol_turnover_rate', 'avg_overdue_ratio',
            'inbound_num'
        }

        if current.empty:
            raise ValueError("当前数据集不能为空")
        if not required_columns.issubset(current.columns):
            missing = required_columns - set(current.columns)
            raise ValueError(f"当前数据缺少必要字段:{missing}")

        if history is not None and not history.empty:
            if not required_columns.issubset(history.columns):
                missing = required_columns - set(history.columns)
                raise ValueError(f"历史数据缺少必要字段:{missing}")

    def _has_valid_history(self):
        """检查是否有有效历史数据"""
        return self.history_df is not None and not self.history_df.empty

    def _prepare_history_data(self):
        """预处理历史数据(带异常保护)"""
        try:
            self.history_grouped = self.history_df.groupby('customer_code_id').agg({
                'avg_sales_amount': 'mean',
                'avg_profit_amount': 'mean',
                'vol_turnover_rate': 'mean',
                'avg_overdue_ratio': 'mean',
                'inbound_num': 'mean',
                'customer_star_rating': lambda x: x.mode()[0] if not x.empty else np.nan
            })
        except KeyError as e:
            raise ValueError(f"历史数据字段异常:{str(e)}") from e

    def _add_anomaly(self, check_type, field, value, code_id, detail):
        """统一记录异常"""
        self.anomalies.append({
            '检查类型': check_type,
            '异常字段': field,
            '当前值': value,
            '客户ID': code_id,
            '详情': detail
        })

    def check_value_range(self, field, valid_range, code_id, current_value):
        """范围检查"""
        if not (valid_range[0] <= current_value <= valid_range[1]):
            self._add_anomaly(
                '范围异常', field, current_value, code_id,
                f"{field}超出有效范围[{valid_range[0]}-{valid_range[1]}]"
            )

    def check_historical_volatility(self, field, code_id, current_value, threshold):
        """历史波动检查"""
        if not self._has_valid_history() or code_id not in self.history_grouped.index:
            return

        hist_value = self.history_grouped.loc[code_id, field]
        if pd.isna(hist_value) or hist_value == 0:
            return

        ratio = current_value / hist_value
        if ratio > threshold or ratio < 1 / threshold:
            self._add_anomaly(
                '波动异常', field, current_value, code_id,
                f"变化达{ratio:.1f}倍 (历史值:{hist_value:.2f})"
            )

    def check_rating_distribution(self):
        """星级分布检查(修复版)"""
        if not self._has_valid_history():
            return

        # 获取全量评级类别
        all_ratings = sorted(set(self.history_df['customer_star_rating']) |
                             set(self.current_df['customer_star_rating']))

        # 计算观察频数
        current_counts = self.current_df['customer_star_rating'].value_counts()
        current_counts = current_counts.reindex(all_ratings, fill_value=0).values

        # 计算期望比例
        hist_dist = self.history_df['customer_star_rating'].value_counts(normalize=True)
        hist_dist = hist_dist.reindex(all_ratings, fill_value=0).values

        # 计算期望频数
        expected_counts = hist_dist * current_counts.sum()

        # 过滤有效数据
        mask = expected_counts > 0
        filtered_observed = current_counts[mask]
        filtered_expected = expected_counts[mask]

        # 执行卡方检验
        if len(filtered_observed) >= 2 and filtered_expected.sum() > 0:
            try:
                _, p_value = stats.chisquare(filtered_observed, filtered_expected)
                if p_value < 0.05:
                    self._add_anomaly(
                        '分布异常', 'customer_star_rating', None, '全局',
                        f"星级分布显著变化(p={p_value:.4f})"
                    )
            except ValueError as e:
                self._add_anomaly(
                    '检查错误', '全局', None, '全局',
                    f"分布检查失败:{str(e)}"
                )

    def check_rating_deviation(self, code_id, current_rating):
        """星级突变检查"""
        if not self._has_valid_history() or code_id not in self.history_grouped.index:
            return

        hist_rating = self.history_grouped.loc[code_id, 'customer_star_rating']
        if pd.notna(hist_rating) and abs(current_rating - hist_rating) >= 3:
            self._add_anomaly(
                '评级突变', 'customer_star_rating', current_rating, code_id,
                f"星级从{hist_rating}→{current_rating}"
            )

    def check_inbound_zero(self, code_id, current_value):
        """入库数突降检查"""
        if not self._has_valid_history() or code_id not in self.history_grouped.index:
            return

        hist_value = self.history_grouped.loc[code_id, 'inbound_num']
        if hist_value > 0 and current_value == 0:
            self._add_anomaly(
                '突降异常', 'inbound_num', current_value, code_id,
                f"入库数从{hist_value}降为0"
            )

    def check_rating_group_anomaly(self):
        """使用IQR方法检测组内异常"""
        for rating, group in self.current_df.groupby('customer_star_rating'):
            if len(group) < 4:
                continue

            values = group['avg_overdue_ratio'].dropna()
            if len(values) < 4:
                continue

            q1 = values.quantile(0.25)
            q3 = values.quantile(0.75)
            iqr = q3 - q1

            if iqr < 1e-6:  # 数据无显著离散性
                continue

            lower_bound = q1 - 1.5 * iqr
            upper_bound = q3 + 1.5 * iqr

            outliers = group[(values < lower_bound) | (values > upper_bound)]
            for _, row in outliers.iterrows():
                self._add_anomaly(
                    '组内异常', 'avg_overdue_ratio',
                    row['avg_overdue_ratio'],
                    row['customer_code_id'],
                    f"在{rating}星客户中超出IQR范围[{lower_bound:.2f}-{upper_bound:.2f}]"
                )

    def run_all_checks(self):
        """执行全部检查"""
        try:
            # 全局检查
            # self.check_rating_distribution()

            # 逐条检查
            for _, row in self.current_df.iterrows():
                code_id = row['customer_code_id']

                # 基础范围检查
                self.check_value_range('avg_overdue_ratio', (0, 1), code_id, row['avg_overdue_ratio'])

                # 历史波动检查
                self.check_historical_volatility('avg_sales_amount', code_id, row['avg_sales_amount'], 10)
                self.check_historical_volatility('avg_profit_amount', code_id, row['avg_profit_amount'], 10)
                self.check_historical_volatility('vol_turnover_rate', code_id, row['vol_turnover_rate'], 5)

                # 特殊字段检查
                self.check_inbound_zero(code_id, row['inbound_num'])
                self.check_rating_deviation(code_id, row['customer_star_rating'])

            # 分组检查
            self.check_rating_group_anomaly()

        except Exception as e:
            self._add_anomaly('系统错误', '全局', None, '全局', f"检查过程异常:{str(e)}")

        return pd.DataFrame(self.anomalies)


def generate_excel_report(report_df, output_path):
    """生成Excel报告"""
    if report_df.empty:
        report_df = pd.DataFrame(columns=['检查类型', '异常字段', '当前值', '客户ID', '详情'])

    with pd.ExcelWriter(output_path) as writer:
        # 异常摘要
        summary = report_df.groupby(['检查类型', '异常字段']).agg({
            '客户ID': 'nunique',
            '详情': lambda x: x.iloc[-1] if len(x) > 0 else ''
        }).reset_index()
        summary.columns = ['检查类型', '异常字段', '影响客户数', '最新异常示例']
        summary.to_excel(writer, sheet_name='异常摘要', index=False)

        # 详细记录
        report_df.to_excel(writer, sheet_name='详细异常', index=False)

        # 格式优化
        workbook = writer.book
        header_format = workbook.add_format({
            'bold': True,
            'bg_color': '#FFE4B5',
            'border': 1
        })

        for sheet in writer.sheets.values():
            sheet.autofilter(0, 0, 0, len(report_df.columns) - 1)
            sheet.freeze_panes(1, 0)
            for col_num, value in enumerate(report_df.columns):
                sheet.write(0, col_num, value, header_format)
            sheet.autofit()

# ======================
# 使用示例
# ======================
if __name__ == "__main__":
    # 加载数据(示例路径)
    current_df = pd.read_excel("files/data_current.xlsx")
    try:
        history_df = pd.read_excel("files/data_history.xlsx")
    except:
        history_df = None
    print(current_df)
    print(history_df)

    # 执行校验
    validator = DataValidator(current_df, history_df)
    anomaly_report = validator.run_all_checks()

    # 生成报告
    # 生成报告
    report_path = f"客户数据质量报告_{datetime.now().strftime('%Y%m%d_%H%M')}.xlsx"
    generate_excel_report(anomaly_report, report_path)
    print(f"生成报告:{report_path}")



相关文章

今天我学习了Python数据统计分析教程,把笔记分享出来

一、环境搭建1. 安装 Python :从官网下载适合你操作系统的版本并安装,建议勾选 “Add Python to PATH” 选项。2. 安装相关库 :常用的有 NumPy(数值计算)、Panda...

Python数据分析(三)

续接Python分析,本篇主要是关于python中一些高阶函数的应用以下是针对你提到的几个高级知识点(数据合并、apply、iterrows、agg、map)的详细解释和案例,帮助你系统学习和理解这些...

Python 数据分析必学的 10 个核心库:从基础操作到高阶建模全攻略

在Python数据分析领域,掌握核心工具库能让你效率倍增。本文精选10个高实用性库,结合代码示例详解从数据处理到机器学习的全流程操作,助你快速进阶数据分析高手。一、Pandas:结构化数据处理的全能选...

工业数据分析工具的力量:用Python释放数据的潜能

阅读文章前辛苦您点下“关注”,方便讨论和分享,为了回馈您的支持,我将每日更新优质内容。如需转载请附上本文源链接!工业数据是现代制造业和工程领域的重要资源。从设备的实时运行指标到生产线的整体效率数据,工...

Python数据分析实战:以数据分析岗为例,探索行业与薪资关联性

金三银四,数据分析师成为众多行业竞相追逐的热门岗位,想知道如何在这个领域精准发力、脱颖而出吗?今天,我将以 BOSS 直聘上的数据为样本,借助 Python 强大的数据分析能力,深度剖析各个行业与薪资...