Commit 2bb3bf7d authored by 舒皓月's avatar 舒皓月

2019-09-03__

parent 91634d46
# coding=utf-8
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import roc_auc_score
import pymysql
import pymongo
import os
import pickle
import warnings
import datetime
from dateutil.relativedelta import relativedelta
from collections import OrderedDict
warnings.filterwarnings('ignore')
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['savefig.dpi'] = 150
class AUCMonitor:
'''
定时间段查看AUC.
'''
def __init__(self, excel_path='./model_score.xlsx', sheet_name='model',
passdue_day=15, save_path='./auc/',
min_user_group=500, interval_days=15, min_auc=0.55,
date_list=('2019-03-01', '2019-03-15', '2019-03-31', '2019-04-15'),
if_save=True, if_read=True):
# 考虑到数据库配置基本不变, 所以不设置创建对象时对应输入变量.
self.mysql_engine = pymysql.connect(host='172.20.6.9',
port=9030,
user='fengkong_read_only',
passwd='mT2HFUgI',
db='risk_analysis',
charset='utf8')
self.mongo_client = pymongo.MongoClient(
"mongodb://haoyue.shu:x2egwRHk7WhQ4So1@172.18.3.22:27017/?authSource=rc_mgo_feature_dp")
self.mongo_db = self.mongo_client['rc_mgo_feature_dp']
self.mongo_table = self.mongo_db['wf_audit_log_with_feature']
# 读取整理在Excel中的模型相关信息.
self.field_info_df = pd.read_excel(excel_path, sheet_name=sheet_name)
self.field_name_list = self.field_info_df.field_name.tolist()
self.field_query_list = self.field_info_df.field_query.tolist()
self.field_app_type_list = self.field_info_df.app_type.tolist()
self.field_app_type_list = [str(x) for x in self.field_app_type_list]
self.field_DB_list = self.field_info_df.DB.tolist()
self.field_query_name_dict = dict(zip(self.field_query_list, self.field_name_list))
self.field_query_app_type_dict = dict(zip(self.field_query_list, self.field_app_type_list))
## 空跑信息.
self.na_time = self.field_info_df.na_time.tolist() # 空跑时间段
self.na_app_type = self.field_info_df.na_app_type.tolist() # 空跑申请类型
self.na_app_chan = self.field_info_df.na_app_chan.tolist() # 空跑渠道
# 一些定义的常量
self.passdue_day = passdue_day # 逾期天数, 默认15.
self.save_path = save_path # 图片保存位置, 默认./image.
self.min_user_group = min_user_group * interval_days / 30 # 最小客群数量.
self.min_auc = min_auc
# 将会从数据库中读取的数据.
self.mysql_df = None
self.mongo_df = None
self.merge_data = None
# 时间数据.
self.date_list = date_list
self.date_range_list = []
for i in range(len(self.date_list) - 1):
self.date_range_list.append(self.date_list[i] + ' ~ ' + self.date_list[i + 1])
# 统计数据记录.
auc_cols = ['model_name', 'app_type', 'app_chan', 'group_name']
for i in range(len(date_list) - 1):
auc_cols.append(self.date_list[i] + ' ~ ' + self.date_list[i + 1] + 'NUM')
auc_cols.append(self.date_list[i] + ' ~ ' + self.date_list[i + 1] + 'AUC')
self.auc_info_df = pd.DataFrame(columns=auc_cols)
# 程序数据读写模式.
self.if_save = if_save # 是否保存从数据库抽取的数据.
self.if_read = if_read # 是否从从数据库读取.
# 创建文件夹.
if not os.path.exists(self.save_path):
os.mkdir(self.save_path)
if not os.path.exists(self.save_path + 'image/'):
os.mkdir(self.save_path + 'image/')
if not os.path.exists(self.save_path + 'data/'):
os.mkdir(self.save_path + 'data/')
if not os.path.exists(self.save_path + 'info/'):
os.mkdir(self.save_path + 'info/')
def query_mysql(self, sql):
'''
连接MySQL数据库, 根据SQL返回数据.
:param sql: str.
:return: DataFrame.
'''
try:
return pd.read_sql(sql, self.mysql_engine)
except:
print('SQL查询出现错误.')
def query_mongo(self, condition, fields):
'''
连接MongoDB, 根据查询返回数据.
:param condition: dict
:param fields: dict
:return: DataFrame
'''
try:
return pd.DataFrame(list(self.mongo_table.find(condition, fields)))
except:
print('Mongo查询出现错误.')
def int2str(self, x):
'''
将int转换为str, 用于日期.
e.g. 5 --> 05
:param x: int
:return: str.
'''
if x >= 10:
return str(x)
else:
return '0' + str(x)
def filter_data(self, df, field):
'''
过滤空跑数据.
:param df: df.
:param field: str, 字段名.
:return: df, 过滤后的数据.
'''
df = df[~((df['applied_type'] == 1) & (df['applied_channel'].apply(lambda x: 'Android' in x)))]
field_idx = self.field_query_list.index(field)
na_time = self.na_time[field_idx]
na_type = self.na_app_type[field_idx]
na_chan = self.na_app_chan[field_idx]
if pd.isnull(na_time): # 没有空跑时间, 则不记录.
return df
# 时间.
t_s, t_e = na_time.split('~')
if len(t_e) == 0: # 若还在空跑, 则不记录.
return pd.DataFrame()
else:
na_df = df[
(df['applied_at'].apply(lambda x: x[:10] >= t_s)) & (df['applied_at'].apply(lambda x: x[:10] <= t_e))]
if na_df.shape[0] == 0:
return df
# 申请类型.
if pd.isnull(na_type):
return df[~df.index.isin(na_df.index.values)]
else:
tmp_df = pd.DataFrame()
for i in str(int(na_type)):
tmp_df = tmp_df.append(na_df[na_df['applied_type'] == int(i)])
na_df = tmp_df
if na_df.shape[0] == 0:
return df
# 申请渠道.
if pd.isnull(na_chan):
return df[~df.index.isin(na_df.index.values)]
else:
tmp_df = pd.DataFrame()
for i in na_chan.split(','):
tmp_df = tmp_df.append(na_df[na_df['applied_channel'].apply(lambda x: i in x)])
na_df = tmp_df
if na_df.shape[0] == 0:
return df
return df[~df.index.isin(na_df.index.values)]
def helper_auc(self, user_group_name=None, df=None, info_dict=None, field=None):
'''
信息提取函数.
:param user_group_name: str, 客群名称.
:param df: Dataframe, 对应客群数据.
:return: None.
'''
print('正在处理%s客群数据.' % user_group_name)
info_dict[user_group_name] = OrderedDict()
date_range_list = list(sorted(df['date_label'].unique().tolist()))
if '' in date_range_list:
date_range_list.remove('')
df_g = df.groupby(['date_label'])['overdue'].agg({'overdue': ['count', 'sum', 'mean']})
df_g.columns = ['_'.join(x) for x in df_g.columns.ravel()]
df_g = df_g.reset_index()
df_g = df_g.sort_values(['date_label'])
for i, m in enumerate(date_range_list):
amt = df_g.loc[df_g['date_label'] == m, 'overdue_count'].values
# 某月样本量小于阈值, 放弃记录信息.
# if amt < self.min_user_group:
# print('%s样本量过小, 放弃提取信息.' % m)
# continue
info_dict[user_group_name][m] = {}
info_dict[user_group_name][m]['NUM'] = amt
info_dict[user_group_name][m]['overdue_ratio'] = df_g.loc[df_g['date_label'] == m, ['overdue_mean']]
print('%s样本量: %d' % (m, amt))
try:
info_dict[user_group_name][m]['AUC'] = roc_auc_score(
df.loc[(df['date_label'] == m) & (df[field].notna()), 'overdue'],
df.loc[(df['date_label'] == m) & (df[field].notna()), field])
except:
info_dict[user_group_name][m]['AUC'] = np.nan
print('AUC计算发生错误.')
print('处理完成.')
print('=' * 40)
def plot_auc(self, field):
# 分离数据.
df_copy = self.merge_data[
[field, 'date_label', 'applied_type', 'applied_channel', 'overdue', 'passdue_day', 'applied_at']].copy()
# 筛选出放款, 且逾期表现的数据.
df_copy = df_copy[(df_copy['overdue'].notna()) & (df_copy[field].notna())]
# 筛选正确申请类型的数据.
tmp_df = pd.DataFrame()
for i in self.field_query_app_type_dict[field]:
tmp_df = tmp_df.append(df_copy[df_copy['applied_type'] == int(i)])
df_copy = tmp_df
# 过滤空跑数据.
df_copy = self.filter_data(df_copy, field)
if df_copy.shape[0] == 0:
print('仍在空跑.')
return None
# 统一时间格式.
if repr(df_copy['applied_at'].dtype) == "dtype('O')":
df_copy = df_copy.loc[
(df_copy[field].notna()) & (df_copy['applied_at'].apply(lambda x: x[:10]) <= self.date_list[-1]) & (
df_copy[field] > 0) & (df_copy['passdue_day'].notna())]
else:
df_copy = df_copy.loc[(df_copy[field].notna()) & (
df_copy['applied_at'].apply(lambda x: x.strftime('%Y-%m-%d')) <= self.date_list[-1]) & (
df_copy[field] > 0) & (df_copy['passdue_day'].notna())]
# 包含各种信息的字典.
# 如: {'全样本':
# {'时间段_0':
# {'该时间段样本量': int.
# '该时间段逾期率': float,
# 'auc': float}
# '时间段_1':
# {'该时间段样本量': int.
# '该时间段逾期率': float,
# 'auc': float}}}
info_dict = {}
# 全样本
self.helper_auc('全样本', df_copy, info_dict, field)
# 按申请类型划分.
self.helper_auc('首申-全渠道', df_copy.loc[df_copy['applied_type'] == 1], info_dict, field)
self.helper_auc('复申-全渠道', df_copy.loc[df_copy['applied_type'] == 2], info_dict, field)
self.helper_auc('复贷-全渠道', df_copy.loc[df_copy['applied_type'] == 3], info_dict, field)
# 按主要客群划分.
## 客群划分.
## user_group_dict = {'首申-融360': (1, 融360)}
user_group_dict = {}
app_type_dict = {1: '首申', 2: '复申', 3: '复贷'}
df_copy_g = df_copy.groupby(['applied_type', 'applied_channel'])[field].count().sort_values(ascending=False)
df_copy_g = df_copy_g.reset_index()
## 过滤小客群.
df_copy_g = df_copy_g.loc[df_copy_g[field] > self.min_user_group]
app_type_set = df_copy_g['applied_type'].unique()
app_chan_set = df_copy_g['applied_channel'].unique()
for app_type in app_type_set:
for app_chan in app_chan_set:
if df_copy_g.loc[
(df_copy_g['applied_type'] == app_type) & (df_copy_g['applied_channel'] == app_chan)].shape[0] != 0:
user_group_dict[app_type_dict[app_type] + '-' + app_chan] = (app_type, app_chan)
del df_copy_g
## 按划分的客群处理数据.
for user_group_name in user_group_dict:
self.helper_auc(user_group_name,
df_copy.loc[(df_copy['applied_type'] == user_group_dict[user_group_name][0]) & (
df_copy['applied_channel'] == user_group_dict[user_group_name][1])], info_dict,
field)
# 过滤不包含信息的客群.
remove_list = []
for user_group_name in info_dict:
if not info_dict[user_group_name]:
remove_list.append(user_group_name)
for user_group_name in remove_list:
del info_dict[user_group_name]
# 画图.
print('开始画图.')
print('=' * 40)
for user_group_name in info_dict:
print(self.field_query_name_dict[field] + '-' + user_group_name)
plt.figure(figsize=(16, 8))
auc_list = []
num_list = []
overdue_ratio_list = []
label = ''
for m in self.date_range_list:
if m in info_dict[user_group_name]:
auc_list.append(info_dict[user_group_name][m]['AUC'])
num_list.append(info_dict[user_group_name][m]['NUM'])
overdue_ratio_list.append(info_dict[user_group_name][m]['overdue_ratio'])
label = label + '%s AUC: %.3f 样本量: %d\n' % (m, info_dict[user_group_name][m]['AUC'],
info_dict[user_group_name][m]['NUM'])
else:
auc_list.append(np.nan)
num_list.append(np.nan)
overdue_ratio_list.append(np.nan)
label = label + '%s AUC: %s 样本量: %s\n' % (m, 'NaN', 'NaN')
plt.plot(range(len(self.date_range_list)),
auc_list, '*--',
label=label)
plt.legend(loc='upper right', fontsize=18)
plt.title(self.field_query_name_dict[field] + '-' + user_group_name, fontdict={'fontsize': 40})
plt.subplots_adjust(left=0.03, right=0.99, top=0.91, bottom=0.03)
plt.savefig(self.save_path + 'image/' + self.field_query_name_dict[field] + '-' + user_group_name)
plt.show()
# 保存统计信息.
def app_type(data):
data = data.split('-')
if len(data) == 1:
return '全类型'
else:
return data[0]
def app_channel(data):
try:
data = data.split('-')[1:]
return '-'.join(data)
except:
return '全渠道'
for user_group_name in info_dict:
tmp_dict = {'model_name': [self.field_query_name_dict[field]],
'app_type': [app_type(user_group_name)],
'app_chan': [app_channel(user_group_name)],
'group_name': [user_group_name]}
for m in info_dict[user_group_name]:
tmp_dict[m + 'NUM'] = [int(info_dict[user_group_name][m]['NUM'])]
tmp_dict[m + 'AUC'] = [round(info_dict[user_group_name][m]['AUC'], 3)]
self.auc_info_df = self.auc_info_df.append(pd.DataFrame(tmp_dict))
def abnormal_auc(self):
def is_abnormal_auc(data):
for i in data.index:
if 'AUC' in i and pd.notna(data[i]) and data[i] < self.min_auc:
return True
return False
self.auc_info_df['is_abnormal'] = self.auc_info_df.apply(is_abnormal_auc, axis=1)
def run(self):
# 获取MySQL数据.
if self.if_read:
mysql_field = [x for i, x in enumerate(self.field_query_list) if self.field_DB_list[i] == 'MySQL']
real_mysql_field = []
for field in mysql_field:
tmp_df = self.query_mysql('''SELECT %s FROM risk_analysis LIMIT 10''' % field)
if tmp_df is not None:
real_mysql_field.append(field)
print('在MySQL中找到该字段: %s' % self.field_query_name_dict[field])
else:
print('在MySQL中不存在该字段: %s' % self.field_query_name_dict[field])
# 删除该字段.
idx = self.field_query_list.index(field)
self.field_query_list.pop(idx)
self.field_DB_list.pop(idx)
self.field_app_type_list.pop(idx)
self.field_name_list.pop(idx)
del self.field_query_name_dict[field]
del self.field_query_app_type_dict[field]
if len(real_mysql_field) > 0:
self.mysql_df = self.query_mysql('''SELECT order_no, applied_at, %s, \
applied_from, applied_channel, transacted, passdue_day \
FROM risk_analysis \
WHERE applied_at >= "%s 00:00:00" \
AND applied_at <= "%s 00:00:00"'''
% (','.join(real_mysql_field), self.date_list[0], self.date_list[-1]))
else:
self.mysql_df = self.query_mysql('''SELECT order_no, applied_at, \
applied_from, applied_channel, transacted, passdue_day \
FROM risk_analysis \
WHERE applied_at >= "%s 00:00:00" \
AND applied_at <= "%s 00:00:00"'''
% (self.date_list[0], self.date_list[-1]))
print('MySQL数据获取成功.')
if self.if_save:
self.mysql_df.to_csv(self.save_path + 'data/mysql_data.csv', index=False)
else:
self.mysql_df = pd.read_csv(self.save_path + 'data/mysql_data.csv')
def func_0(data):
try:
return int(int(data) + 1)
except:
return np.nan
# 获取MongoDB数据.
if self.if_read:
condition = {'wf_created_at': {'$gte': '%s 00:00:00' % self.date_list[0],
'$lte': '%s 00:00:00' % self.date_list[-1]}}
fields = {'wf_biz_no': 1, 'wf_created_at': 1, 'wf_loan_type': 1}
for f in self.field_query_list: # 加入Excel中预置的模型分名称
fields[f] = 1
self.mongo_df = self.query_mongo(condition, fields)
self.mongo_df['applied_type'] = self.mongo_df['wf_loan_type'].apply(func_0)
del self.mongo_df['wf_loan_type']
print('MongoDB数据获取成功.')
if self.if_save:
self.mongo_df.to_csv(self.save_path + 'data/mongo_data.csv', index=False)
else:
self.mongo_df = pd.read_csv(self.save_path + 'data/mongo_data.csv')
self.mongo_df = self.mongo_df.loc[self.mongo_df['applied_type'].notna()]
# MySQL数据去重.
self.mysql_df = self.mysql_df.sort_values('passdue_day')
self.mysql_df = self.mysql_df.drop_duplicates('order_no', keep='first')
print('数据去重完成.')
# 拼接数据.
self.merge_data = pd.merge(left=self.mysql_df, right=self.mongo_df,
left_on='order_no', right_on='wf_biz_no', how='inner')
print('数据拼接完成.')
## 定义逾期用户.
def overdue(data):
if pd.isnull(data):
return np.nan
else:
return float(data > self.passdue_day)
self.merge_data['overdue'] = self.merge_data['passdue_day'].apply(overdue)
# 清洗时间格式, 使其转换成统一的字符串格式.
if repr(self.merge_data['applied_at'].dtype) == "dtype('O')":
self.merge_data['applied_at'] = self.merge_data['applied_at'].apply(lambda x: x[:10])
else:
self.merge_data['applied_at'] = self.merge_data['applied_at'].apply(lambda x: x.strftime('%Y-%m-%d'))
# 清洗数据.
def clean_data(data):
try:
return float(data)
except:
return np.nan
na_field_list = []
for field in self.field_query_list:
if field in self.merge_data.columns.tolist():
print('正在清洗%s' % self.field_query_name_dict[field])
self.merge_data[field] = self.merge_data[field].apply(clean_data)
else:
na_field_list.append(field)
## 去除因为一些原因未抽取到的字段.
print('不包含以下字段:')
for field in na_field_list:
print(self.field_query_name_dict[field])
self.field_query_list.remove(field)
self.field_name_list.remove(self.field_query_name_dict[field])
del self.field_query_name_dict[field]
# 数据按时间划分.
self.merge_data['date_label'] = ''
for i in range(len(self.date_list) - 1):
self.merge_data.loc[
(self.merge_data['applied_at'] >= '%s' % self.date_list[i]) &
(self.merge_data['applied_at'] < '%s' % self.date_list[i + 1]),
'date_label'] = self.date_list[i] + ' ~ ' + self.date_list[i + 1]
# 画图.
print('开始画图-AUC.')
for field in self.field_query_list:
self.plot_auc(field)
# 检测是否异常.
self.abnormal_auc()
# 保存统计信息.
self.auc_info_df.to_csv(self.save_path + 'info/auc_info.csv', index=False)
print('统计信息保存成功.')
# coding=utf-8
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import roc_auc_score
import pymysql
import pymongo
import os
import pickle
import warnings
import datetime
from dateutil.relativedelta import relativedelta
from collections import OrderedDict
warnings.filterwarnings('ignore')
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['savefig.dpi'] = 150
class PSIMonitor:
'''
定时段查看PSI变化情况.
'''
def __init__(self, excel_path='./model_score.xlsx', sheet_name='model',
min_user_group=500, max_psi=0.1, interval_days=15,
save_path='./psi/',
if_save=True, if_read=True,
date_list=('2019-03-01', '2019-03-15', '2019-03-31', '2019-04-15')):
# 考虑到数据库配置基本不变, 所以不设置创建对象时对应输入变量.
self.mysql_engine = pymysql.connect(host='172.20.6.9',
port=9030,
user='fengkong_read_only',
passwd='mT2HFUgI',
db='risk_analysis',
charset='utf8')
self.mongo_client = pymongo.MongoClient(
"mongodb://haoyue.shu:x2egwRHk7WhQ4So1@172.18.3.22:27017/?authSource=rc_mgo_feature_dp")
self.mongo_db = self.mongo_client['rc_mgo_feature_dp']
self.mongo_table = self.mongo_db['wf_audit_log_with_feature']
# 读取整理在Excel中的模型相关信息.
self.field_info_df = pd.read_excel(excel_path, sheet_name=sheet_name)
self.field_name_list = self.field_info_df.field_name.tolist()
self.field_query_list = self.field_info_df.field_query.tolist()
self.field_app_type_list = self.field_info_df.app_type.tolist()
self.field_app_type_list = [str(x) for x in self.field_app_type_list]
self.field_DB_list = self.field_info_df.DB.tolist()
self.field_query_name_dict = dict(zip(self.field_query_list, self.field_name_list))
self.field_query_app_type_dict = dict(zip(self.field_query_list, self.field_app_type_list))
## 空跑信息.
self.na_time = self.field_info_df.na_time.tolist() # 空跑时间段
self.na_app_type = self.field_info_df.na_app_type.tolist() # 空跑申请类型
self.na_app_chan = self.field_info_df.na_app_chan.tolist() # 空跑渠道
# 一些定义的常量
self.min_user_group = min_user_group * interval_days / 30 # 最小客群数量.
self.max_psi = max_psi # 最大PSI, 超过视为异常.
# 数据规格
self.date_list = date_list
# 将会从数据库中读取的数据.
self.mysql_df = None
self.mongo_df = None
self.merge_data = None
# 统计数据记录.
psi_cols = ['field_query', 'group_name']
for i in range(len(date_list) - 1):
psi_cols.append(self.date_list[i] + ' ~ ' + self.date_list[i + 1] + ' NUM: ')
psi_cols.append(self.date_list[i] + ' ~ ' + self.date_list[i + 1] + ' PSI: ')
self.psi_info_df = pd.DataFrame(columns=psi_cols)
# 程序数据读写模式.
self.if_save = if_save # 是否保存从数据库抽取的数据.
self.if_read = if_read # 是否从从数据库读取.
# 分箱方式.
self.bins = None
# 创建文件夹保存图片.
self.save_path = save_path
if not os.path.exists(self.save_path):
os.mkdir(self.save_path)
if not os.path.exists(self.save_path + 'image/'):
os.mkdir(self.save_path + 'image/')
if not os.path.exists(self.save_path + 'data/'):
os.mkdir(self.save_path + 'data/')
if not os.path.exists(self.save_path + 'info/'):
os.mkdir(self.save_path + 'info/')
def query_mysql(self, sql):
'''
连接MySQL数据库, 根据SQL返回数据.
:param sql: str.
:return: DataFrame.
'''
try:
return pd.read_sql(sql, self.mysql_engine)
except:
print('SQL查询出现错误.')
def query_mongo(self, condition, fields):
'''
连接MongoDB, 根据查询返回数据.
:param condition: dict
:param fields: dict
:return: DataFrame
'''
try:
return pd.DataFrame(list(self.mongo_table.find(condition, fields)))
except:
print('Mongo查询出现错误.')
def int2str(self, x):
'''
将int转换为str, 用于日期.
e.g. 5 --> 05
:param x: int
:return: str.
'''
if x >= 10:
return str(x)
else:
return '0' + str(x)
def make_bin(self, score_list):
'''
对传入的模型分进行等频分箱.
:param score_list: pd.Series
:return: list[num]
'''
score_list = score_list[score_list.notna()]
try:
bins = score_list.quantile([.1, .2, .3, .4, .5, .6, .7, .8, .9]).values.tolist()
bins = [-99999999] + bins + [99999999]
bins = [x for x in bins if pd.notna(x)]
bins = list(sorted(list(set(bins))))
return bins
except:
print('分箱出现错误.')
return None
def calc_psi(self, array_1, array_2):
'''
计算PSI.
:param array_1: array
:param array_2: array
:return: PSI
'''
# 对array做平滑处理, 防止一些分箱为零的PSI计算异常.
array_1 = array_1 + 0.001
array_2 = array_2 + 0.001
try:
psi = ((array_1 - array_2) * np.log10(array_1 / array_2)).sum()
return psi
except:
return None
def filter_data(self, df, field):
'''
过滤空跑数据.
:param df: df.
:param field: str, 字段名.
:return: df, 过滤后的数据.
'''
df = df[~((df['applied_type'] == 1) & (df['applied_channel'].apply(lambda x: 'Android' in x)))]
field_idx = self.field_query_list.index(field)
na_time = self.na_time[field_idx]
na_type = self.na_app_type[field_idx]
na_chan = self.na_app_chan[field_idx]
if pd.isnull(na_time): # 没有空跑时间, 则不记录.
return df
# 时间.
t_s, t_e = na_time.split('~')
if len(t_e) == 0: # 若还在空跑, 则不记录.
return pd.DataFrame()
else:
na_df = df[
(df['applied_at'].apply(lambda x: x[:10] >= t_s)) & (df['applied_at'].apply(lambda x: x[:10] <= t_e))]
if na_df.shape[0] == 0:
return df
# 申请类型.
if pd.isnull(na_type):
return df[~df.index.isin(na_df.index.values)]
else:
tmp_df = pd.DataFrame()
for i in str(int(na_type)):
tmp_df = tmp_df.append(na_df[na_df['applied_type'] == int(i)])
na_df = tmp_df
if na_df.shape[0] == 0:
return df
# 申请渠道.
if pd.isnull(na_chan):
return df[~df.index.isin(na_df.index.values)]
else:
tmp_df = pd.DataFrame()
for i in na_chan.split(','):
tmp_df = tmp_df.append(na_df[na_df['applied_channel'].apply(lambda x: i in x)])
na_df = tmp_df
if na_df.shape[0] == 0:
return df
return df[~df.index.isin(na_df.index.values)]
def helper_psi(self, user_group_name=None, df=None, info_dict=None, field=None):
'''
信息提取函数.
:param user_group_name: str, 客群名称.
:param df: Dataframe, 对应客群数据.
:return: None.
'''
print('正在处理%s客群数据.' % user_group_name)
info_dict[user_group_name] = OrderedDict()
date_list = list(sorted(df['date_label'].unique().tolist()))
if '' in date_list:
date_list.remove('')
df_g = df.groupby(['date_label', 'bins']).agg({field: ['count']})
df_g.columns = ['_'.join(x) for x in df_g.columns.ravel()]
df_g = df_g.reset_index()
df_g = df_g.sort_values(['date_label', 'bins'])
for i, date in enumerate(date_list):
amt_in_bins = df_g.loc[df_g['date_label'] == date, ['bins', field + '_count']]
amt_in_bins = pd.merge(left=self.bins, right=amt_in_bins, on='bins', how='left')
amt_in_bins[field + '_count'] = amt_in_bins[field + '_count'].fillna(0)
amt_in_bins = amt_in_bins[field + '_count'].values
## 某月样本量小于阈值, 放弃记录信息.
if amt_in_bins.sum() < self.min_user_group:
print('%s样本量过小, 放弃提取信息.' % date)
continue
info_dict[user_group_name][date] = {}
info_dict[user_group_name][date]['num'] = amt_in_bins.sum()
info_dict[user_group_name][date]['num_in_bins'] = amt_in_bins
info_dict[user_group_name][date]['ratio_in_bins'] = amt_in_bins / amt_in_bins.sum()
print('%s样本量: %d' % (date, info_dict[user_group_name][date]['num']))
# 计算PSI.
for i, date in enumerate(info_dict[user_group_name]):
if i == 0:
info_dict[user_group_name][date]['psi'] = 0
bench_bins_ratio = info_dict[user_group_name][date]['ratio_in_bins']
else:
psi = self.calc_psi(bench_bins_ratio, info_dict[user_group_name][date]['ratio_in_bins'])
if psi:
info_dict[user_group_name][date]['psi'] = psi
else:
info_dict[user_group_name][date]['psi'] = -999
print('计算PSI出现错误.')
print('处理完成.')
print('=' * 40)
def plot_psi(self, field):
# 分离数据.
df_copy = self.merge_data[[field, 'date_label', 'applied_type', 'applied_channel', 'applied_at']].copy()
# 选择包含正确申请类型的数据.
tmp_df = pd.DataFrame()
for i in self.field_query_app_type_dict[field]:
tmp_df = tmp_df.append(df_copy[df_copy['applied_type'] == int(i)])
df_copy = tmp_df
df_copy = df_copy[df_copy[field].notna()]
# 过滤空跑数据.
df_copy = self.filter_data(df_copy, field)
if df_copy.shape[0] == 0:
print('仍在空跑.')
return None
# 对模型分进行分箱, 选取数据中该模型分最开始的那个时间段作为基准.
bins = None
for i in range(len(self.date_list) - 1):
if df_copy.loc[df_copy['date_label'] == self.date_list[i] + ' ~ ' + self.date_list[i + 1], field].shape[0] \
< self.min_user_group:
print('%s数据过少.' % (self.date_list[i] + ' ~ ' + self.date_list[i + 1]))
else:
bins = self.make_bin(
df_copy.loc[df_copy['date_label'] == self.date_list[i] + ' ~ ' + self.date_list[i + 1], field])
break
if bins is None:
return None
df_copy['bins'] = pd.cut(df_copy[field], bins, precision=8, labels=False) # 根据分箱规则进行分箱.
self.bins = pd.Series(df_copy['bins'].unique(), name='bins').sort_values()
self.bins = self.bins.dropna()
# 包含各种信息的字典.
# 如: {'全样本':
# {'时间段_0':
# {'psi': 0,
# '该月样本量': int.
# '各分箱样本量': [...],
# '各分箱样本占比': [...]}
# '时间段_1':
# {'psi': float,
# '该月样本量': int.
# '各分箱样本量': [...],
# '各分箱样本占比': [...]}}}
info_dict = {}
# 全样本
self.helper_psi('全样本', df_copy, info_dict, field)
# 按申请类型划分.
self.helper_psi('首申', df_copy.loc[df_copy['applied_type'] == 1], info_dict, field)
self.helper_psi('复申', df_copy.loc[df_copy['applied_type'] == 2], info_dict, field)
self.helper_psi('复贷', df_copy.loc[df_copy['applied_type'] == 3], info_dict, field)
# 按主要客群划分.
## 客群划分.
## user_group_dict = {'首申-融360': (1, 融360)}
user_group_dict = {}
app_type_dict = {1: '首申', 2: '复申', 3: '复贷'}
df_copy_g = df_copy.groupby(['applied_type', 'applied_channel'])[field].count().sort_values(ascending=False)
df_copy_g = df_copy_g.reset_index()
## 过滤小客群.
df_copy_g = df_copy_g.loc[df_copy_g[field] > self.min_user_group]
app_type_set = df_copy_g['applied_type'].unique()
app_chan_set = df_copy_g['applied_channel'].unique()
for app_type in app_type_set:
for app_chan in app_chan_set:
if df_copy_g.loc[
(df_copy_g['applied_type'] == app_type) & (df_copy_g['applied_channel'] == app_chan)].shape[0] != 0:
user_group_dict[app_type_dict[app_type] + '-' + app_chan] = (app_type, app_chan)
del df_copy_g
## 按划分的客群处理数据.
for user_group_name in user_group_dict:
self.helper_psi(user_group_name,
df_copy.loc[(df_copy['applied_type'] == user_group_dict[user_group_name][0]) & (
df_copy['applied_channel'] == user_group_dict[user_group_name][1])], info_dict,
field)
# 过滤不包含信息的客群.
remove_list = []
for user_group_name in info_dict:
if not info_dict[user_group_name]:
remove_list.append(user_group_name)
for user_group_name in remove_list:
del info_dict[user_group_name]
# 画图.
print('开始画图.')
print('=' * 40)
for user_group_name in info_dict:
print(self.field_query_name_dict[field] + '-' + user_group_name)
plt.figure(figsize=(16, 8))
for date in info_dict[user_group_name]:
plt.plot(range(len(info_dict[user_group_name][date]['ratio_in_bins'])),
[round(x, 3) for x in info_dict[user_group_name][date]['ratio_in_bins']],
label='%s PSI: %.3f \n 样本量: %d' % (
date, info_dict[user_group_name][date]['psi'], info_dict[user_group_name][date]['num']))
plt.legend(loc='upper right', fontsize=10)
plt.title(self.field_query_name_dict[field] + '-' + user_group_name,
fontdict={'fontsize': 40})
plt.subplots_adjust(left=0.03, right=0.99, top=0.91, bottom=0.03)
plt.savefig(self.save_path + 'image/' + self.field_query_name_dict[field] + '-' + user_group_name)
plt.show()
# 保存统计信息.
for user_group_name in info_dict:
tmp_dict = {'field_query': [self.field_query_name_dict[field]],
'group_name': [user_group_name]}
for date in info_dict[user_group_name]:
tmp_dict[date + ' NUM: '] = [int(info_dict[user_group_name][date]['num'])]
tmp_dict[date + ' PSI: '] = [round(info_dict[user_group_name][date]['psi'], 3)]
self.psi_info_df = self.psi_info_df.append(pd.DataFrame(tmp_dict))
def abnormal_psi(self):
def is_abnormal_psi(data):
for idx in data.index:
if 'PSI' in idx and pd.notna(data[idx]) and data[idx] > self.max_psi:
return True
return False
self.psi_info_df['is_abnormal'] = self.psi_info_df.apply(is_abnormal_psi, axis=1)
def run(self):
# 获取MySQL数据.
if self.if_read:
mysql_field = [x for i, x in enumerate(self.field_query_list) if self.field_DB_list[i] == 'MySQL']
real_mysql_field = []
for field in mysql_field:
tmp_df = self.query_mysql('''SELECT %s FROM risk_analysis LIMIT 10''' % field)
if tmp_df is not None:
real_mysql_field.append(field)
print('在MySQL中找到该字段: %s' % self.field_query_name_dict[field])
else:
print('在MySQL中不存在该字段: %s' % self.field_query_name_dict[field])
# 删除该字段.
idx = self.field_query_list.index(field)
self.field_query_list.pop(idx)
self.field_DB_list.pop(idx)
self.field_app_type_list.pop(idx)
self.field_name_list.pop(idx)
del self.field_query_name_dict[field]
del self.field_query_app_type_dict[field]
if len(real_mysql_field) > 0:
self.mysql_df = self.query_mysql('''SELECT order_no, applied_at, %s, \
applied_from, applied_channel, transacted, passdue_day \
FROM risk_analysis \
WHERE applied_at >= "%s 00:00:00" \
AND applied_at <= "%s 00:00:00"'''
% (','.join(real_mysql_field), self.date_list[0], self.date_list[-1]))
else:
self.mysql_df = self.query_mysql('''SELECT order_no, applied_at, \
applied_from, applied_channel, transacted, passdue_day \
FROM risk_analysis \
WHERE applied_at >= "%s 00:00:00" \
AND applied_at <= "%s 00:00:00"'''
% (self.date_list[0], self.date_list[-1]))
print('MySQL数据获取成功.')
if self.if_save:
self.mysql_df.to_csv(self.save_path + 'data/mysql_data.csv', index=False)
else:
self.mysql_df = pd.read_csv(self.save_path + 'data/mysql_data.csv')
def func_0(data):
try:
return int(int(data) + 1)
except:
return np.nan
# 获取MongoDB数据.
if self.if_read:
condition = {'wf_created_at': {'$gte': '%s 00:00:00' % self.date_list[0],
'$lte': '%s 00:00:00' % self.date_list[-1]}}
fields = {'wf_biz_no': 1, 'wf_created_at': 1, 'wf_loan_type': 1}
for f in self.field_query_list: # 加入Excel中预置的模型分名称
fields[f] = 1
self.mongo_df = self.query_mongo(condition, fields)
self.mongo_df['applied_type'] = self.mongo_df['wf_loan_type'].apply(func_0)
self.mongo_df = self.mongo_df.loc[self.mongo_df['applied_type'].notna()]
del self.mongo_df['wf_loan_type']
print('MongoDB数据获取成功.')
if self.if_save:
self.mongo_df.to_csv(self.save_path + 'data/mongo_data.csv', index=False)
else:
self.mongo_df = pd.read_csv(self.save_path + 'data/mongo_data.csv')
# MySQL数据去重.
self.mysql_df = self.mysql_df.sort_values('passdue_day')
self.mysql_df = self.mysql_df.drop_duplicates('order_no', keep='first')
print('数据去重完成.')
# 拼接数据.
self.merge_data = pd.merge(left=self.mysql_df, right=self.mongo_df,
left_on='order_no', right_on='wf_biz_no', how='inner')
print('数据拼接完成.')
# 清洗时间格式, 使其转换成统一的字符串格式.
if repr(self.merge_data['applied_at'].dtype) == "dtype('O')":
self.merge_data['applied_at'] = self.merge_data['applied_at'].apply(lambda x: x[:10])
else:
self.merge_data['applied_at'] = self.merge_data['applied_at'].apply(lambda x: x.strftime('%Y-%m-%d'))
# 清洗数据.
def clean_data(data):
try:
return float(data)
except:
return np.nan
na_field_list = []
for field in self.field_query_list:
if field in self.merge_data.columns.tolist():
print('正在清洗%s' % self.field_query_name_dict[field])
self.merge_data[field] = self.merge_data[field].apply(clean_data)
else:
na_field_list.append(field)
## 去除因为一些原因未抽取到的字段.
print('不包含以下字段:')
for field in na_field_list:
print(self.field_query_name_dict[field])
self.field_query_list.remove(field)
self.field_name_list.remove(self.field_query_name_dict[field])
del self.field_query_name_dict[field]
# 数据按时间划分.
self.merge_data['date_label'] = ''
for i in range(len(self.date_list) - 1):
self.merge_data.loc[
(self.merge_data['applied_at'] >= '%s' % self.date_list[i]) &
(self.merge_data['applied_at'] < '%s' % self.date_list[i + 1]),
'date_label'] = self.date_list[i] + ' ~ ' + self.date_list[i + 1]
# 画图.
print('开始画图-PSI.')
for field in self.field_query_list:
self.plot_psi(field)
# 检测是否异常.
self.abnormal_psi()
# 保存统计信息.
self.psi_info_df.to_csv(self.save_path + 'info/psi_info.csv', index=False)
print('统计信息保存成功.')
No preview for this file type
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment