Python数据分析(四)
现在上手一个数据分析实际案例:
从数据结果集本身入手,使用通用性检查分析数据+业务规则校验是否有异常,最终形成分析报告,若有问题及时做出人工干预
背景:数据最终到应用层面,会经过很多前面很多节点处理,包括数据清洗、数据转换、数据统计等等,这里面步骤有可能做了,也可能没有做,最终输出的结果在经过验证阶段,即使在验证那一刻保证了数据逻辑是正确的,也很难保证后续的每次跑数都不会出现问题。很多可能都会造成问题,包括数据贴源层数据的丢失,数据的延迟,调度的失败,依赖冲突,逻辑的误改动,后面到应用端节点数据接口问题等等,长期我们有关于数据处理链路可靠性保障方案,短期或者临时的话,也需要能短期能实现使用的方案,于是就有了这通过python来分析结果的一个案例
在已经有了python的编程知识了,结合Deepseek之后,输出分析案例的效率大大提高,我们现在差的也就是解决方案的思路了,具体的代码能力已经可以交给Deepseek来实现,而我要做的就是能对代码进行实际的调试使用,以达到最终目的
我开始对Deepseek的从前面进行一些背景的介绍和问题的引入后,最后实际编码前,问了Deepseek
好的,我清楚了,我现在要面临的业务场景是,结果集数据一年跑四次,频率极低,我打算在每次正式使用前几天就跑出这个结果,进行异常检查,然后生成分析报告,目前我能想到的,是对某些值进行一个值范围的校验,比如逾期率,只会在0%-100%,比如字段为空就是异常的校验,比如某个库存平均值,超出阈值水平的校验,比如现有的数据条目和历史上次数据的一个比较,异常的检出,我这边需要将这些规则通过python代码的方式进行处理,然后生成一份excel,第一个sheet有异常分析描述,后面的第二个sheet是记录了异常检查数据,请你根据我上方的描述,生成一份完整的python代码
Deepseek回复了个大体的框架,也提供了实际的代码后,我觉得还有必要再细化下我的规则,以及能提供更详细的模拟数据。于是我准备了两份数据,以及数据内各个字段的校验规则
对关键字段的校验规则,请按如下实现
group_customer_code
customer_code_id
customer_code
对应关系如下: customer_code_id为集团客户ID,与customer_code关系是一对多; 每个customer_code的指标数据除了credit_ccoefficient,其他指标值都会赋一样的值
对基础数据字段的检查,请给我分类抽象到方法
avg_sales_amount:主要是和历史数据比较,对比之前的数据高于10倍或者,低于10倍都是属于异常数据
avg_profit_amount:主要是和历史数据比较,对比之前的数据高于10倍或者,低于10倍都是属于异常数据
vol_turnover_rate:主要是和历史数据比较,对比之前的数据高于5倍或者,低于5倍都是属于异常数据
avg_overdue_ratio:有效值范围在0 - 1,其他是属于异常;在同一customer_star_rating,数值属于偏差太极端的
inbound_num:历史数据值 > 0, 当前数据变成0
为异常数据, customer_star_rating:当前数据比较历史数据值波动太大,比如从1跨多级到4、5、6
这种或者6级变成1级这种有异常;整体各个星级的占比情况波动太大,可能整体数据会有异常
Deepseek给出的代码后,我复制到我的编译器,调整了文件的获取地址,执行后,出了卡方检验报错问题,我觉得这并非必要,注释不解决了。后面代码运行成功,在这个基础上,改变了模拟数据,构造验证代码是否正确能覆盖校验场景的数据,一顿操作下来,全命中了校验场景。效果还是很不错了
从一开始的思路,到最终实现,出了个excel报告,中间也有些事情耽搁,大概花了2个小时,这效率还是蛮高的了。
最后附上代码
import pandas as pd
import numpy as np
from datetime import datetime
from scipy import stats
class DataValidator:
def __init__(self, current_data, history_data=None):
# 数据校验
self._validate_input_data(current_data, history_data)
self.current_df = current_data
self.history_df = history_data
self.anomalies = []
# 预处理历史数据
self.history_grouped = None
if self._has_valid_history():
self._prepare_history_data()
def _validate_input_data(self, current, history):
"""校验输入数据完整性"""
required_columns = {
'customer_code_id', 'customer_star_rating',
'avg_sales_amount', 'avg_profit_amount',
'vol_turnover_rate', 'avg_overdue_ratio',
'inbound_num'
}
if current.empty:
raise ValueError("当前数据集不能为空")
if not required_columns.issubset(current.columns):
missing = required_columns - set(current.columns)
raise ValueError(f"当前数据缺少必要字段:{missing}")
if history is not None and not history.empty:
if not required_columns.issubset(history.columns):
missing = required_columns - set(history.columns)
raise ValueError(f"历史数据缺少必要字段:{missing}")
def _has_valid_history(self):
"""检查是否有有效历史数据"""
return self.history_df is not None and not self.history_df.empty
def _prepare_history_data(self):
"""预处理历史数据(带异常保护)"""
try:
self.history_grouped = self.history_df.groupby('customer_code_id').agg({
'avg_sales_amount': 'mean',
'avg_profit_amount': 'mean',
'vol_turnover_rate': 'mean',
'avg_overdue_ratio': 'mean',
'inbound_num': 'mean',
'customer_star_rating': lambda x: x.mode()[0] if not x.empty else np.nan
})
except KeyError as e:
raise ValueError(f"历史数据字段异常:{str(e)}") from e
def _add_anomaly(self, check_type, field, value, code_id, detail):
"""统一记录异常"""
self.anomalies.append({
'检查类型': check_type,
'异常字段': field,
'当前值': value,
'客户ID': code_id,
'详情': detail
})
def check_value_range(self, field, valid_range, code_id, current_value):
"""范围检查"""
if not (valid_range[0] <= current_value <= valid_range[1]):
self._add_anomaly(
'范围异常', field, current_value, code_id,
f"{field}超出有效范围[{valid_range[0]}-{valid_range[1]}]"
)
def check_historical_volatility(self, field, code_id, current_value, threshold):
"""历史波动检查"""
if not self._has_valid_history() or code_id not in self.history_grouped.index:
return
hist_value = self.history_grouped.loc[code_id, field]
if pd.isna(hist_value) or hist_value == 0:
return
ratio = current_value / hist_value
if ratio > threshold or ratio < 1 / threshold:
self._add_anomaly(
'波动异常', field, current_value, code_id,
f"变化达{ratio:.1f}倍 (历史值:{hist_value:.2f})"
)
def check_rating_distribution(self):
"""星级分布检查(修复版)"""
if not self._has_valid_history():
return
# 获取全量评级类别
all_ratings = sorted(set(self.history_df['customer_star_rating']) |
set(self.current_df['customer_star_rating']))
# 计算观察频数
current_counts = self.current_df['customer_star_rating'].value_counts()
current_counts = current_counts.reindex(all_ratings, fill_value=0).values
# 计算期望比例
hist_dist = self.history_df['customer_star_rating'].value_counts(normalize=True)
hist_dist = hist_dist.reindex(all_ratings, fill_value=0).values
# 计算期望频数
expected_counts = hist_dist * current_counts.sum()
# 过滤有效数据
mask = expected_counts > 0
filtered_observed = current_counts[mask]
filtered_expected = expected_counts[mask]
# 执行卡方检验
if len(filtered_observed) >= 2 and filtered_expected.sum() > 0:
try:
_, p_value = stats.chisquare(filtered_observed, filtered_expected)
if p_value < 0.05:
self._add_anomaly(
'分布异常', 'customer_star_rating', None, '全局',
f"星级分布显著变化(p={p_value:.4f})"
)
except ValueError as e:
self._add_anomaly(
'检查错误', '全局', None, '全局',
f"分布检查失败:{str(e)}"
)
def check_rating_deviation(self, code_id, current_rating):
"""星级突变检查"""
if not self._has_valid_history() or code_id not in self.history_grouped.index:
return
hist_rating = self.history_grouped.loc[code_id, 'customer_star_rating']
if pd.notna(hist_rating) and abs(current_rating - hist_rating) >= 3:
self._add_anomaly(
'评级突变', 'customer_star_rating', current_rating, code_id,
f"星级从{hist_rating}→{current_rating}"
)
def check_inbound_zero(self, code_id, current_value):
"""入库数突降检查"""
if not self._has_valid_history() or code_id not in self.history_grouped.index:
return
hist_value = self.history_grouped.loc[code_id, 'inbound_num']
if hist_value > 0 and current_value == 0:
self._add_anomaly(
'突降异常', 'inbound_num', current_value, code_id,
f"入库数从{hist_value}降为0"
)
def check_rating_group_anomaly(self):
"""使用IQR方法检测组内异常"""
for rating, group in self.current_df.groupby('customer_star_rating'):
if len(group) < 4:
continue
values = group['avg_overdue_ratio'].dropna()
if len(values) < 4:
continue
q1 = values.quantile(0.25)
q3 = values.quantile(0.75)
iqr = q3 - q1
if iqr < 1e-6: # 数据无显著离散性
continue
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
outliers = group[(values < lower_bound) | (values > upper_bound)]
for _, row in outliers.iterrows():
self._add_anomaly(
'组内异常', 'avg_overdue_ratio',
row['avg_overdue_ratio'],
row['customer_code_id'],
f"在{rating}星客户中超出IQR范围[{lower_bound:.2f}-{upper_bound:.2f}]"
)
def run_all_checks(self):
"""执行全部检查"""
try:
# 全局检查
# self.check_rating_distribution()
# 逐条检查
for _, row in self.current_df.iterrows():
code_id = row['customer_code_id']
# 基础范围检查
self.check_value_range('avg_overdue_ratio', (0, 1), code_id, row['avg_overdue_ratio'])
# 历史波动检查
self.check_historical_volatility('avg_sales_amount', code_id, row['avg_sales_amount'], 10)
self.check_historical_volatility('avg_profit_amount', code_id, row['avg_profit_amount'], 10)
self.check_historical_volatility('vol_turnover_rate', code_id, row['vol_turnover_rate'], 5)
# 特殊字段检查
self.check_inbound_zero(code_id, row['inbound_num'])
self.check_rating_deviation(code_id, row['customer_star_rating'])
# 分组检查
self.check_rating_group_anomaly()
except Exception as e:
self._add_anomaly('系统错误', '全局', None, '全局', f"检查过程异常:{str(e)}")
return pd.DataFrame(self.anomalies)
def generate_excel_report(report_df, output_path):
"""生成Excel报告"""
if report_df.empty:
report_df = pd.DataFrame(columns=['检查类型', '异常字段', '当前值', '客户ID', '详情'])
with pd.ExcelWriter(output_path) as writer:
# 异常摘要
summary = report_df.groupby(['检查类型', '异常字段']).agg({
'客户ID': 'nunique',
'详情': lambda x: x.iloc[-1] if len(x) > 0 else ''
}).reset_index()
summary.columns = ['检查类型', '异常字段', '影响客户数', '最新异常示例']
summary.to_excel(writer, sheet_name='异常摘要', index=False)
# 详细记录
report_df.to_excel(writer, sheet_name='详细异常', index=False)
# 格式优化
workbook = writer.book
header_format = workbook.add_format({
'bold': True,
'bg_color': '#FFE4B5',
'border': 1
})
for sheet in writer.sheets.values():
sheet.autofilter(0, 0, 0, len(report_df.columns) - 1)
sheet.freeze_panes(1, 0)
for col_num, value in enumerate(report_df.columns):
sheet.write(0, col_num, value, header_format)
sheet.autofit()
# ======================
# 使用示例
# ======================
if __name__ == "__main__":
# 加载数据(示例路径)
current_df = pd.read_excel("files/data_current.xlsx")
try:
history_df = pd.read_excel("files/data_history.xlsx")
except:
history_df = None
print(current_df)
print(history_df)
# 执行校验
validator = DataValidator(current_df, history_df)
anomaly_report = validator.run_all_checks()
# 生成报告
# 生成报告
report_path = f"客户数据质量报告_{datetime.now().strftime('%Y%m%d_%H%M')}.xlsx"
generate_excel_report(anomaly_report, report_path)
print(f"生成报告:{report_path}")