Commit b4fb47f1 authored by 舒皓月's avatar 舒皓月

2019-09-14

parent 7f9b4c97
tmp.py
test.py
.idea/ .idea/
test/
\ No newline at end of file
# coding=utf-8
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import roc_auc_score
import pymysql
import pymongo
import os
import pickle
import warnings
import datetime
from dateutil.relativedelta import relativedelta
from collections import OrderedDict
warnings.filterwarnings('ignore')
class PSIMonitor:
'''
定时段, 定字段, 定渠道查看PSI变化情况.
'''
def __init__(self, excel_path='./model_score.xlsx', sheet_name='model',
min_user_group=500, max_psi=0.1,
if_save=True, if_read=True,
date_list=('2019-03-01', '2019-03-15', '2019-03-31', '2019-04-15'),
field_query='model_exec_data_source#fst_v6_xy_br_dhb_raw',
channel='融360'):
# 考虑到数据库配置基本不变, 所以不设置创建对象时对应输入变量.
self.mysql_engine = pymysql.connect(host='172.20.6.9',
port=9030,
user='fengkong_read_only',
passwd='mT2HFUgI',
db='risk_analysis',
charset='utf8')
self.mongo_client = pymongo.MongoClient(
"mongodb://haoyue.shu:x2egwRHk7WhQ4So1@172.18.3.22:27017/?authSource=rc_mgo_feature_dp")
self.mongo_db = self.mongo_client['rc_mgo_feature_dp']
self.mongo_table = self.mongo_db['wf_audit_log_with_feature']
# 读取整理在Excel中的模型相关信息.
self.field_info_df = pd.read_excel(excel_path, sheet_name=sheet_name)
self.field_name_list = self.field_info_df.field_name.tolist()
self.field_query_list = self.field_info_df.field_query.tolist()
self.field_app_type_list = self.field_info_df.app_type.tolist()
self.field_app_type_list = [str(x) for x in self.field_app_type_list]
self.field_DB_list = self.field_info_df.DB.tolist()
self.field_query_name_dict = dict(zip(self.field_query_list, self.field_name_list))
self.field_query_app_type_dict = dict(zip(self.field_query_list, self.field_app_type_list))
## 空跑信息.
self.na_time = self.field_info_df.na_time.tolist() # 空跑时间段
self.na_app_type = self.field_info_df.na_app_type.tolist() # 空跑申请类型
self.na_app_chan = self.field_info_df.na_app_chan.tolist() # 空跑渠道
# 一些定义的常量
self.min_user_group = min_user_group # 最小客群数量.
self.max_psi = max_psi # 最大PSI, 超过视为异常.
# 数据规格
self.channel = channel
self.field_query = field_query
self.date_list = date_list
# 将会从数据库中读取的数据.
self.mysql_df = None
self.mongo_df = None
self.merge_data = None
# 统计数据记录.
psi_cols = ['field_query', 'group_name']
for i in range(len(date_list) - 1):
psi_cols.append(self.date_list[i] + ' ~ ' + self.date_list[i + 1] + ' NUM: ')
psi_cols.append(self.date_list[i] + ' ~ ' + self.date_list[i + 1] + ' PSI: ')
self.psi_info_df = pd.DataFrame(columns=psi_cols)
# 程序数据读写模式.
self.if_save = if_save # 是否保存从数据库抽取的数据.
self.if_read = if_read # 是否从从数据库读取.
# 分箱方式.
self.bins = None
# 创建文件夹保存图片.
if not os.path.exists('./PSI/'):
os.mkdir('./PSI/')
if not os.path.exists('./data/'):
os.mkdir('./data/')
if not os.path.exists('./info/'):
os.mkdir('./info/')
def query_mysql(self, sql):
'''
连接MySQL数据库, 根据SQL返回数据.
:param sql: str.
:return: DataFrame.
'''
try:
return pd.read_sql(sql, self.mysql_engine)
except:
print('SQL查询出现错误.')
def mongo_query(self, condition, fields):
'''
连接MongoDB, 根据查询返回数据.
:param condition: dict
:param fields: dict
:return: DataFrame
'''
try:
return pd.DataFrame(list(self.mongo_table.find(condition, fields)))
except:
print('Mongo查询出现错误.')
def int2str(self, x):
'''
将int转换为str, 用于日期.
e.g. 5 --> 05
:param x: int
:return: str.
'''
if x >= 10:
return str(x)
else:
return '0' + str(x)
def make_bin(self, score_list):
'''
对传入的模型分进行等频分箱.
:param score_list: pd.Series
:return: list[num]
'''
score_list = score_list[score_list.notna()]
try:
bins = score_list.quantile([.1, .2, .3, .4, .5, .6, .7, .8, .9]).values.tolist()
bins = [-99999999] + bins + [99999999]
bins = [x for x in bins if pd.notna(x)]
bins = list(sorted(list(set(bins))))
return bins
except:
print('分箱出现错误.')
return None
def calc_psi(self, array_1, array_2):
'''
计算PSI.
:param array_1: array
:param array_2: array
:return: PSI
'''
# 对array做平滑处理, 防止一些分箱为零的PSI计算异常.
array_1 = array_1 + 0.001
array_2 = array_2 + 0.001
try:
psi = ((array_1 - array_2) * np.log10(array_1 / array_2)).sum()
return psi
except:
return None
def filter_data(self, df, field):
'''
过滤空跑数据.
:param df: df.
:param field: str, 字段名.
:return: df, 过滤后的数据.
'''
df = df[~((df['applied_type'] == 1) & (df['applied_channel'].apply(lambda x: 'Android' in x)))]
field_idx = self.field_query_list.index(field)
na_time = self.na_time[field_idx]
na_type = self.na_app_type[field_idx]
na_chan = self.na_app_chan[field_idx]
if pd.isnull(na_time): # 没有空跑时间, 则不记录.
return df
# 时间.
t_s, t_e = na_time.split('~')
if len(t_e) == 0: # 若还在空跑, 则不记录.
return pd.DataFrame()
else:
na_df = df[
(df['applied_at'].apply(lambda x: x[:10] >= t_s)) & (df['applied_at'].apply(lambda x: x[:10] <= t_e))]
if na_df.shape[0] == 0:
return df
# 申请类型.
if pd.isnull(na_type):
return df[~df.index.isin(na_df.index.values)]
else:
tmp_df = pd.DataFrame()
for i in str(int(na_type)):
tmp_df = tmp_df.append(na_df[na_df['applied_type'] == int(i)])
na_df = tmp_df
if na_df.shape[0] == 0:
return df
# 申请渠道.
if pd.isnull(na_chan):
return df[~df.index.isin(na_df.index.values)]
else:
tmp_df = pd.DataFrame()
for i in na_chan.split(','):
tmp_df = tmp_df.append(na_df[na_df['applied_channel'].apply(lambda x: i in x)])
na_df = tmp_df
if na_df.shape[0] == 0:
return df
return df[~df.index.isin(na_df.index.values)]
def helper_psi(self, user_group_name=None, df=None, info_dict=None, field=None):
'''
信息提取函数.
:param user_group_name: str, 客群名称.
:param df: Dataframe, 对应客群数据.
:return: None.
'''
print('正在处理%s客群数据.' % user_group_name)
info_dict[user_group_name] = OrderedDict()
date_list = list(sorted(df['date_label'].unique().tolist()))
if '' in date_list:
date_list.remove('')
df_g = df.groupby(['date_label', 'bins']).agg({field: ['count']})
df_g.columns = ['_'.join(x) for x in df_g.columns.ravel()]
df_g = df_g.reset_index()
df_g = df_g.sort_values(['date_label', 'bins'])
for i, date in enumerate(date_list):
amt_in_bins = df_g.loc[df_g['date_label'] == date, ['bins', field + '_count']]
amt_in_bins = pd.merge(left=self.bins, right=amt_in_bins, on='bins', how='left')
amt_in_bins[field + '_count'] = amt_in_bins[field + '_count'].fillna(0)
amt_in_bins = amt_in_bins[field + '_count'].values
## 某月样本量小于阈值, 放弃记录信息.
if amt_in_bins.sum() < self.min_user_group:
print('%s样本量过小, 放弃提取信息.' % date)
continue
info_dict[user_group_name][date] = {}
info_dict[user_group_name][date]['num'] = amt_in_bins.sum()
info_dict[user_group_name][date]['num_in_bins'] = amt_in_bins
info_dict[user_group_name][date]['ratio_in_bins'] = amt_in_bins / amt_in_bins.sum()
print('%s样本量: %d' % (date, info_dict[user_group_name][date]['num']))
# 计算PSI.
for i, date in enumerate(info_dict[user_group_name]):
if i == 0:
info_dict[user_group_name][date]['psi'] = 0
bench_bins_ratio = info_dict[user_group_name][date]['ratio_in_bins']
else:
psi = self.calc_psi(bench_bins_ratio, info_dict[user_group_name][date]['ratio_in_bins'])
if psi:
info_dict[user_group_name][date]['psi'] = psi
else:
info_dict[user_group_name][date]['psi'] = -999
print('计算PSI出现错误.')
print('处理完成.')
print('=' * 40)
def plot_psi(self, field):
# 分离数据.
df_copy = self.merge_data[[field, 'date_label', 'applied_type', 'applied_channel', 'applied_at']].copy()
# 分离渠道.
df_copy = df_copy[df_copy['applied_channel'].apply(lambda x: self.channel in x)]
# 选择包含正确申请类型的数据.
tmp_df = pd.DataFrame()
for i in self.field_query_app_type_dict[field]:
tmp_df = tmp_df.append(df_copy[df_copy['applied_type'] == int(i)])
df_copy = tmp_df
df_copy = df_copy[df_copy[field].notna()]
# 过滤空跑数据.
df_copy = self.filter_data(df_copy, field)
if df_copy.shape[0] == 0:
print('仍在空跑.')
return None
# 对模型分进行分箱, 选取数据中该模型分最开始的那个时间段作为基准.
if df_copy.loc[df_copy['date_label'] == self.date_list[0] + ' ~ ' + self.date_list[1], field].shape[0] < self.min_user_group:
print('基准月数据过少.')
return None
else:
bins = self.make_bin(df_copy.loc[df_copy['date_label'] == self.date_list[0] + ' ~ ' + self.date_list[1], field])
if not bins:
return None
df_copy['bins'] = pd.cut(df_copy[field], bins, precision=8, labels=False) # 根据分箱规则进行分箱.
self.bins = pd.Series(df_copy['bins'].unique(), name='bins').sort_values()
self.bins = self.bins.dropna()
# 包含各种信息的字典.
# 如: {'全样本':
# {'时间段_0':
# {'psi': 0,
# '该月样本量': int.
# '各分箱样本量': [...],
# '各分箱样本占比': [...]}
# '时间段_1':
# {'psi': float,
# '该月样本量': int.
# '各分箱样本量': [...],
# '各分箱样本占比': [...]}}}
info_dict = {}
# 全样本
self.helper_psi('全样本', df_copy, info_dict, field)
# 按申请类型划分.
self.helper_psi('首申', df_copy.loc[df_copy['applied_type'] == 1], info_dict, field)
self.helper_psi('复申', df_copy.loc[df_copy['applied_type'] == 2], info_dict, field)
self.helper_psi('复贷', df_copy.loc[df_copy['applied_type'] == 3], info_dict, field)
# 过滤不包含信息的客群.
remove_list = []
for user_group_name in info_dict:
if not info_dict[user_group_name]:
remove_list.append(user_group_name)
for user_group_name in remove_list:
del info_dict[user_group_name]
# 画图.
print('开始画图.')
print('=' * 40)
for user_group_name in info_dict:
print(self.field_query_name_dict[field] + '-' + user_group_name)
plt.figure(figsize=(16, 8))
for date in info_dict[user_group_name]:
plt.plot(range(len(info_dict[user_group_name][date]['ratio_in_bins'])),
[round(x, 3) for x in info_dict[user_group_name][date]['ratio_in_bins']],
label='%s PSI: %.3f \n 样本量: %d' % (
date, info_dict[user_group_name][date]['psi'], info_dict[user_group_name][date]['num']))
plt.legend(loc='upper right', fontsize=10)
plt.title(self.field_query_name_dict[field] + '-' + user_group_name + '-' + self.channel, fontdict={'fontsize': 40})
plt.subplots_adjust(left=0.03, right=0.99, top=0.91, bottom=0.03)
plt.savefig('./PSI/' + self.field_query_name_dict[field] + '-' + user_group_name)
plt.show()
# 保存统计信息.
for user_group_name in info_dict:
tmp_dict = {'field_query': [self.field_query_name_dict[field]],
'group_name': [user_group_name]}
for date in info_dict[user_group_name]:
tmp_dict[date + ' NUM: '] = [int(info_dict[user_group_name][date]['num'])]
tmp_dict[date + ' PSI: '] = [round(info_dict[user_group_name][date]['psi'], 3)]
self.psi_info_df = self.psi_info_df.append(pd.DataFrame(tmp_dict))
def abnormal_psi(self):
def is_abnormal_psi(data):
for idx in data.index:
if 'PSI' in idx and pd.notna(data[idx]) and data[idx] > self.max_psi:
return True
return False
self.psi_info_df['is_abnormal'] = self.psi_info_df.apply(is_abnormal_psi, axis=1)
def run(self):
# 获取MySQL数据.
if self.if_read:
mysql_field = [x for i, x in enumerate(self.field_query_list) if self.field_DB_list[i] == 'MySQL']
real_mysql_field = []
for field in mysql_field:
tmp_df = self.query_mysql('''SELECT %s FROM risk_analysis LIMIT 10''' % field)
if tmp_df is not None:
real_mysql_field.append(field)
print('在MySQL中找到该字段: %s' % self.field_query_name_dict[field])
else:
print('在MySQL中不存在该字段: %s' % self.field_query_name_dict[field])
# 删除该字段.
idx = self.field_query_list.index(field)
self.field_query_list.pop(idx)
self.field_DB_list.pop(idx)
self.field_app_type_list.pop(idx)
self.field_name_list.pop(idx)
del self.field_query_name_dict[field]
del self.field_query_app_type_dict[field]
if self.field_query in real_mysql_field:
tmp_field = self.field_query + ','
else:
tmp_field = ''
self.mysql_df = self.query_mysql('''SELECT order_no, applied_at, %s \
applied_from, applied_channel, transacted, passdue_day \
FROM risk_analysis \
WHERE applied_at >= "%s 00:00:00" \
AND applied_at <= "%s 00:00:00"'''
% (tmp_field, self.date_list[0], self.date_list[-1]))
print('MySQL数据获取成功.')
if self.if_save:
self.mysql_df.to_csv('./data/' + self.field_query_name_dict[self.field_query] + '_mysql_data.csv', index=False)
else:
self.mysql_df = pd.read_csv('./data/' + self.field_query_name_dict[self.field_query] + '_mysql_data.csv')
def func_0(data):
try:
return int(int(data) + 1)
except:
return np.nan
# 获取MongoDB数据.
if self.if_read:
condition = {'wf_created_at': {'$gte': '%s 00:00:00' % self.date_list[0],
'$lte': '%s 00:00:00' % self.date_list[-1]}}
fields = {'wf_biz_no': 1, 'wf_created_at': 1, 'wf_loan_type': 1}
for f in self.field_query_list: # 加入Excel中预置的模型分名称
if self.field_query == f:
fields[f] = 1
self.mongo_df = self.mongo_query(condition, fields)
self.mongo_df['applied_type'] = self.mongo_df['wf_loan_type'].apply(func_0)
self.mongo_df = self.mongo_df.loc[self.mongo_df['applied_type'].notna()]
del self.mongo_df['wf_loan_type']
print('MongoDB数据获取成功.')
if self.if_save:
self.mongo_df.to_csv('./data/' + self.field_query_name_dict[self.field_query] + '_mongo_data.csv', index=False)
else:
self.mongo_df = pd.read_csv('./data/' + self.field_query_name_dict[self.field_query] + '_mongo_data.csv')
# MySQL数据去重.
self.mysql_df = self.mysql_df.sort_values('passdue_day')
self.mysql_df = self.mysql_df.drop_duplicates('order_no', keep='first')
print('数据去重完成.')
# 拼接数据.
self.merge_data = pd.merge(left=self.mysql_df, right=self.mongo_df,
left_on='order_no', right_on='wf_biz_no', how='inner')
print('数据拼接完成.')
# 清洗时间格式, 使其转换成统一的字符串格式.
if repr(self.merge_data['applied_at'].dtype) == "dtype('O')":
self.merge_data['applied_at'] = self.merge_data['applied_at'].apply(lambda x: x[:10])
else:
self.merge_data['applied_at'] = self.merge_data['applied_at'].apply(lambda x: x.strftime('%Y-%m-%d'))
# 清洗数据.
def clean_data(data):
try:
return float(data)
except:
return np.nan
if self.field_query in self.merge_data.columns.tolist():
print('正在清洗%s' % self.field_query_name_dict[self.field_query])
self.merge_data[self.field_query] = self.merge_data[self.field_query].apply(clean_data)
else:
print('清洗失败.')
# 数据按时间划分.
self.merge_data['date_label'] = ''
for i in range(len(self.date_list) - 1):
self.merge_data.loc[
(self.merge_data['applied_at'] >= '%s' % self.date_list[i]) &
(self.merge_data['applied_at'] < '%s' % self.date_list[i + 1]),
'date_label'] = self.date_list[i] + ' ~ ' + self.date_list[i + 1]
# 画图.
print('开始画图-PSI.')
self.plot_psi(self.field_query)
# 检测是否异常.
self.abnormal_psi()
# 保存统计信息.
self.psi_info_df.to_csv('./info/' + self.field_query_name_dict[self.field_query] + '_psi_info.csv', index=False)
print('统计信息保存成功.')
...@@ -21,9 +21,9 @@ warnings.filterwarnings('ignore') ...@@ -21,9 +21,9 @@ warnings.filterwarnings('ignore')
class ModelMonitor: class ModelMonitor:
def __init__(self, excel_path='./model_score.xlsx', sheet_name='mongo_model', def __init__(self, excel_path='./model_score.xlsx', sheet_name='model',
passdue_day=15, save_path='./image/', passdue_day=15, save_path='./image/',
num_month=4, min_user_group=1000, max_psi=0.1, min_aucr=0.85, num_month=4, min_user_group=500, max_psi=0.1, min_aucr=0.85,
if_save=True, if_read=True): if_save=True, if_read=True):
# 考虑到数据库配置基本不变, 所以不设置创建对象时对应输入变量. # 考虑到数据库配置基本不变, 所以不设置创建对象时对应输入变量.
...@@ -45,6 +45,7 @@ class ModelMonitor: ...@@ -45,6 +45,7 @@ class ModelMonitor:
self.field_query_list = self.field_info_df.field_query.tolist() self.field_query_list = self.field_info_df.field_query.tolist()
self.field_app_type_list = self.field_info_df.app_type.tolist() self.field_app_type_list = self.field_info_df.app_type.tolist()
self.field_app_type_list = [str(x) for x in self.field_app_type_list] self.field_app_type_list = [str(x) for x in self.field_app_type_list]
self.field_DB_list = self.field_info_df.DB.tolist()
self.field_query_name_dict = dict(zip(self.field_query_list, self.field_name_list)) self.field_query_name_dict = dict(zip(self.field_query_list, self.field_name_list))
self.field_query_app_type_dict = dict(zip(self.field_query_list, self.field_app_type_list)) self.field_query_app_type_dict = dict(zip(self.field_query_list, self.field_app_type_list))
## 空跑信息. ## 空跑信息.
...@@ -61,13 +62,13 @@ class ModelMonitor: ...@@ -61,13 +62,13 @@ class ModelMonitor:
self.min_aucr = min_aucr # 最小AUC比率, 小于视为异常. self.min_aucr = min_aucr # 最小AUC比率, 小于视为异常.
# 获取当天日期信息. # 获取当天日期信息.
self.current_date = (datetime.date.today() + relativedelta(days=-1)).strftime('%Y-%m-%d') self.current_date = (datetime.date.today() + relativedelta(days=-2)).strftime('%Y-%m-%d')
self.response_date = (datetime.date.today() + relativedelta(days=-(31 + self.passdue_day))).strftime('%Y-%m-%d') self.response_date = (datetime.date.today() + relativedelta(days=-(32 + self.passdue_day))).strftime('%Y-%m-%d')
self.first_date = (datetime.date.today() + relativedelta(days=-1) + relativedelta( self.first_date = (datetime.date.today() + relativedelta(days=-2) + relativedelta(
months=-self.num_month + 1)).strftime('%Y-%m-01') months=-self.num_month + 1)).strftime('%Y-%m-01')
self.current_month = (datetime.date.today() + datetime.timedelta(days=-1)).month self.current_month = (datetime.date.today() + datetime.timedelta(days=-2)).month
self.response_month = (datetime.date.today() + relativedelta(days=-(31 + self.passdue_day))).month self.response_month = (datetime.date.today() + relativedelta(days=-(32 + self.passdue_day))).month
self.first_month = self.current_month - self.num_month + 1 self.first_month = self.current_month - self.num_month + 1
# 将会从数据库中读取的数据. # 将会从数据库中读取的数据.
...@@ -100,7 +101,7 @@ class ModelMonitor: ...@@ -100,7 +101,7 @@ class ModelMonitor:
self.bench_month = None self.bench_month = None
def sql_query(self, sql): def query_mysql(self, sql):
''' '''
连接MySQL数据库, 根据SQL返回数据. 连接MySQL数据库, 根据SQL返回数据.
:param sql: str. :param sql: str.
...@@ -147,15 +148,10 @@ class ModelMonitor: ...@@ -147,15 +148,10 @@ class ModelMonitor:
bins = [-99999999] + bins + [99999999] bins = [-99999999] + bins + [99999999]
bins = [x for x in bins if pd.notna(x)] bins = [x for x in bins if pd.notna(x)]
bins = list(sorted(list(set(bins)))) bins = list(sorted(list(set(bins))))
# print(bins)
# if len(set(bins)) < 11:
# return None
return bins return bins
except: except:
print('分箱出现错误.') print('分箱出现错误.')
with open('bin_error.pkl', 'wb') as f: return None
pickle.dump(score_list, f)
f.close()
def calc_psi(self, array_1, array_2): def calc_psi(self, array_1, array_2):
''' '''
...@@ -186,16 +182,13 @@ class ModelMonitor: ...@@ -186,16 +182,13 @@ class ModelMonitor:
na_time = self.na_time[field_idx] na_time = self.na_time[field_idx]
na_type = self.na_app_type[field_idx] na_type = self.na_app_type[field_idx]
na_chan = self.na_app_chan[field_idx] na_chan = self.na_app_chan[field_idx]
print(na_chan, type(na_chan))
if pd.isnull(na_time): # 没有空跑时间, 则不记录. if pd.isnull(na_time): # 没有空跑时间, 则不记录.
return df return df
# 时间. # 时间.
t_s, t_e = na_time.split('~') t_s, t_e = na_time.split('~')
print(t_s, t_e)
if len(t_e) == 0: # 若还在空跑, 则不记录. if len(t_e) == 0: # 若还在空跑, 则不记录.
return pd.DataFrame() return pd.DataFrame()
else: else:
print(df['applied_at'].head())
na_df = df[ na_df = df[
(df['applied_at'].apply(lambda x: x[:10] >= t_s)) & (df['applied_at'].apply(lambda x: x[:10] <= t_e))] (df['applied_at'].apply(lambda x: x[:10] >= t_s)) & (df['applied_at'].apply(lambda x: x[:10] <= t_e))]
if na_df.shape[0] == 0: if na_df.shape[0] == 0:
...@@ -207,7 +200,6 @@ class ModelMonitor: ...@@ -207,7 +200,6 @@ class ModelMonitor:
tmp_df = pd.DataFrame() tmp_df = pd.DataFrame()
for i in str(int(na_type)): for i in str(int(na_type)):
print(i, 'wsnd')
tmp_df = tmp_df.append(na_df[na_df['applied_type'] == int(i)]) tmp_df = tmp_df.append(na_df[na_df['applied_type'] == int(i)])
na_df = tmp_df na_df = tmp_df
if na_df.shape[0] == 0: if na_df.shape[0] == 0:
...@@ -223,7 +215,6 @@ class ModelMonitor: ...@@ -223,7 +215,6 @@ class ModelMonitor:
if na_df.shape[0] == 0: if na_df.shape[0] == 0:
return df return df
print(df.shape[0], na_df.shape[0])
return df[~df.index.isin(na_df.index.values)] return df[~df.index.isin(na_df.index.values)]
def helper_psi(self, user_group_name=None, df=None, info_dict=None, field=None): def helper_psi(self, user_group_name=None, df=None, info_dict=None, field=None):
...@@ -406,7 +397,7 @@ class ModelMonitor: ...@@ -406,7 +397,7 @@ class ModelMonitor:
df_copy_g = df_copy_g.reset_index() df_copy_g = df_copy_g.reset_index()
## 过滤小客群. ## 过滤小客群.
df_copy_g = df_copy_g.loc[df_copy_g[field] > self.min_user_group * self.num_month] df_copy_g = df_copy_g.loc[df_copy_g[field] > self.min_user_group]
tmp_df = pd.DataFrame() tmp_df = pd.DataFrame()
for i in range(df_copy_g.shape[0]): for i in range(df_copy_g.shape[0]):
...@@ -438,21 +429,18 @@ class ModelMonitor: ...@@ -438,21 +429,18 @@ class ModelMonitor:
print(self.field_query_name_dict[field] + '-' + user_group_name) print(self.field_query_name_dict[field] + '-' + user_group_name)
plt.figure(figsize=(16, 8)) plt.figure(figsize=(16, 8))
for m in info_dict[user_group_name]: for m in info_dict[user_group_name]:
# print(m)
# print(info_dict[user_group_name][m]['psi'])
# print(info_dict[user_group_name][m]['该月样本量'])
plt.plot(range(len(info_dict[user_group_name][m]['各分箱样本占比'])), plt.plot(range(len(info_dict[user_group_name][m]['各分箱样本占比'])),
[round(x, 3) for x in info_dict[user_group_name][m]['各分箱样本占比']], [round(x, 3) for x in info_dict[user_group_name][m]['各分箱样本占比']],
label='%s PSI: %.3f \n 样本量: %d' % ( label='%s PSI: %.3f \n 样本量: %d' % (
m, info_dict[user_group_name][m]['psi'], info_dict[user_group_name][m]['该月样本量'])) m, info_dict[user_group_name][m]['psi'], info_dict[user_group_name][m]['该月样本量']))
plt.legend(loc='upper right') plt.legend(loc='upper right', fontsize=10)
plt.title(self.field_query_name_dict[field] + '-' + user_group_name) plt.title(self.field_query_name_dict[field] + '-' + user_group_name, fontdict={'fontsize': 40})
plt.subplots_adjust(left=0.03, right=0.99, top=0.91, bottom=0.03)
plt.savefig(self.save_path + 'PSI/' + self.field_query_name_dict[field] + '-' + user_group_name) plt.savefig(self.save_path + 'PSI/' + self.field_query_name_dict[field] + '-' + user_group_name)
plt.show() plt.show()
# 保存统计信息. # 保存统计信息.
for user_group_name in info_dict: for user_group_name in info_dict:
# print(self.model_feild_name_dict[field] + '-' + user_group_name)
tmp_dict = {'模型名称': [self.field_query_name_dict[field]], tmp_dict = {'模型名称': [self.field_query_name_dict[field]],
'客群名称': [user_group_name]} '客群名称': [user_group_name]}
for m in info_dict[user_group_name]: for m in info_dict[user_group_name]:
...@@ -469,7 +457,10 @@ class ModelMonitor: ...@@ -469,7 +457,10 @@ class ModelMonitor:
# 分离数据. # 分离数据.
df_copy = self.merge_data[ df_copy = self.merge_data[
[field, 'month_label', 'applied_type', 'applied_channel', 'overdue', 'passdue_day', 'applied_at']].copy() [field, 'month_label', 'applied_type', 'applied_channel', 'overdue', 'passdue_day', 'applied_at']].copy()
# 筛选出放款, 且逾期表现的数据.
df_copy = df_copy[(df_copy['overdue'].notna()) & (df_copy[field].notna())] df_copy = df_copy[(df_copy['overdue'].notna()) & (df_copy[field].notna())]
# 筛选正确申请类型的数据.
tmp_df = pd.DataFrame() tmp_df = pd.DataFrame()
for i in self.field_query_app_type_dict[field]: for i in self.field_query_app_type_dict[field]:
tmp_df = tmp_df.append(df_copy[df_copy['applied_type'] == int(i)]) tmp_df = tmp_df.append(df_copy[df_copy['applied_type'] == int(i)])
...@@ -479,7 +470,7 @@ class ModelMonitor: ...@@ -479,7 +470,7 @@ class ModelMonitor:
if df_copy.shape[0] == 0: if df_copy.shape[0] == 0:
print('仍在空跑.') print('仍在空跑.')
return None return None
## 筛选出放款, 且逾期表现的数据. # 统一时间格式.
if repr(df_copy['applied_at'].dtype) == "dtype('O')": if repr(df_copy['applied_at'].dtype) == "dtype('O')":
df_copy = df_copy.loc[ df_copy = df_copy.loc[
(df_copy[field].notna()) & (df_copy['applied_at'].apply(lambda x: x[:10]) <= self.response_date) & ( (df_copy[field].notna()) & (df_copy['applied_at'].apply(lambda x: x[:10]) <= self.response_date) & (
...@@ -543,7 +534,7 @@ class ModelMonitor: ...@@ -543,7 +534,7 @@ class ModelMonitor:
df_copy_g = df_copy.groupby(['applied_type', 'applied_channel'])[field].count().sort_values(ascending=False) df_copy_g = df_copy.groupby(['applied_type', 'applied_channel'])[field].count().sort_values(ascending=False)
df_copy_g = df_copy_g.reset_index() df_copy_g = df_copy_g.reset_index()
## 过滤小客群. ## 过滤小客群.
df_copy_g = df_copy_g.loc[df_copy_g[field] > self.min_user_group * (self.num_month - 1)] df_copy_g = df_copy_g.loc[df_copy_g[field] > self.min_user_group]
app_type_set = df_copy_g['applied_type'].unique() app_type_set = df_copy_g['applied_type'].unique()
app_chan_set = df_copy_g['applied_channel'].unique() app_chan_set = df_copy_g['applied_channel'].unique()
for app_type in app_type_set: for app_type in app_type_set:
...@@ -583,8 +574,9 @@ class ModelMonitor: ...@@ -583,8 +574,9 @@ class ModelMonitor:
label='%s AUC: %.3f AUCR: %.3f \n 样本量: %d' % ( label='%s AUC: %.3f AUCR: %.3f \n 样本量: %d' % (
m, info_dict[user_group_name][m]['auc'], info_dict[user_group_name][m]['aucR'], m, info_dict[user_group_name][m]['auc'], info_dict[user_group_name][m]['aucR'],
info_dict[user_group_name][m]['该月样本量'])) info_dict[user_group_name][m]['该月样本量']))
plt.legend(loc='upper right') plt.legend(loc='upper right', fontsize=10)
plt.title(self.field_query_name_dict[field] + '-' + user_group_name) plt.title(self.field_query_name_dict[field] + '-' + user_group_name, fontdict={'fontsize': 40})
plt.subplots_adjust(left=0.03, right=0.99, top=0.91, bottom=0.03)
plt.savefig(self.save_path + 'AUC/' + self.field_query_name_dict[field] + '-' + user_group_name) plt.savefig(self.save_path + 'AUC/' + self.field_query_name_dict[field] + '-' + user_group_name)
plt.show() plt.show()
...@@ -620,12 +612,29 @@ class ModelMonitor: ...@@ -620,12 +612,29 @@ class ModelMonitor:
def run(self): def run(self):
# 获取MySQL数据, 取近期num_month个月数据(如今天7.27, 则这27天算进7月). # 获取MySQL数据, 取近期num_month个月数据(如今天7.27, 则这27天算进7月).
if self.if_read: if self.if_read:
self.mysql_df = self.sql_query('''SELECT order_no, applied_at, mysql_field = [x for i, x in enumerate(self.field_query_list) if self.field_DB_list[i] == 'MySQL']
applied_from, applied_channel, transacted, passdue_day real_mysql_field = []
FROM risk_analysis for field in mysql_field:
WHERE applied_at >= "%s 00:00:00" tmp_df = self.query_mysql('''SELECT %s FROM risk_analysis LIMIT 10''' % field)
if tmp_df is not None:
real_mysql_field.append(field)
print('在MySQL中找到该字段: %s' % self.field_query_name_dict[field])
else:
print('在MySQL中不存在该字段: %s' % self.field_query_name_dict[field])
# 删除该字段.
idx = self.field_query_list.index(field)
self.field_query_list.pop(idx)
self.field_DB_list.pop(idx)
self.field_app_type_list.pop(idx)
self.field_name_list.pop(idx)
del self.field_query_name_dict[field]
del self.field_query_app_type_dict[field]
self.mysql_df = self.query_mysql('''SELECT order_no, applied_at, %s, \
applied_from, applied_channel, transacted, passdue_day \
FROM risk_analysis \
WHERE applied_at >= "%s 00:00:00" \
AND applied_at <= "%s 00:00:00"''' AND applied_at <= "%s 00:00:00"'''
% (self.first_date, datetime.date.today().strftime('%Y-%m-%d'))) % (','.join(real_mysql_field), self.first_date, datetime.date.today().strftime('%Y-%m-%d')))
print('MySQL数据获取成功.') print('MySQL数据获取成功.')
if self.if_save: if self.if_save:
self.mysql_df.to_csv('./mysql_data.csv', index=False) self.mysql_df.to_csv('./mysql_data.csv', index=False)
......
...@@ -19,7 +19,7 @@ import warnings ...@@ -19,7 +19,7 @@ import warnings
warnings.filterwarnings('ignore') warnings.filterwarnings('ignore')
plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['savefig.dpi'] = 100 plt.rcParams['savefig.dpi'] = 150
class ModelMonitorVLM: class ModelMonitorVLM:
...@@ -28,7 +28,7 @@ class ModelMonitorVLM: ...@@ -28,7 +28,7 @@ class ModelMonitorVLM:
fig_save_path='./image/', info_save_path='./info/', fig_save_path='./image/', info_save_path='./info/',
data_save_path='./data/', data_save_path='./data/',
if_read=True, if_save=True, if_read=True, if_save=True,
alpha=0.01, min_user_group=10000): alpha=0.01, min_user_group=2000):
# 考虑到数据库配置基本不变, 所以不设置创建对象时对应输入变量. # 考虑到数据库配置基本不变, 所以不设置创建对象时对应输入变量.
self.mysql_engine = pymysql.connect(host='172.20.6.9', self.mysql_engine = pymysql.connect(host='172.20.6.9',
...@@ -49,6 +49,7 @@ class ModelMonitorVLM: ...@@ -49,6 +49,7 @@ class ModelMonitorVLM:
self.field_query_list = self.field_info_df.field_query.tolist() self.field_query_list = self.field_info_df.field_query.tolist()
self.field_app_type_list = self.field_info_df.app_type.tolist() self.field_app_type_list = self.field_info_df.app_type.tolist()
self.field_app_type_list = [str(x) for x in self.field_app_type_list] self.field_app_type_list = [str(x) for x in self.field_app_type_list]
self.field_DB_list = self.field_info_df.DB.tolist()
self.field_query_name_dict = dict(zip(self.field_query_list, self.field_name_list)) self.field_query_name_dict = dict(zip(self.field_query_list, self.field_name_list))
self.field_query_app_type_dict = dict(zip(self.field_query_list, self.field_app_type_list)) self.field_query_app_type_dict = dict(zip(self.field_query_list, self.field_app_type_list))
## 空跑信息. ## 空跑信息.
...@@ -86,14 +87,14 @@ class ModelMonitorVLM: ...@@ -86,14 +87,14 @@ class ModelMonitorVLM:
self.if_save = if_save self.if_save = if_save
# 获取当天日期信息. # 获取当天日期信息.
self.current_date = (datetime.date.today() + relativedelta(days=-1)).strftime('%Y-%m-%d') self.current_date = (datetime.date.today() + relativedelta(days=-2)).strftime('%Y-%m-%d')
self.third_date = datetime.date.today().strftime('%Y-%m-01') self.third_date = (datetime.date.today() + relativedelta(days=-2)).strftime('%Y-%m-01')
self.second_date = (datetime.date.today() + relativedelta(months=-1)).strftime('%Y-%m-01') self.second_date = (datetime.date.today() + relativedelta(days=-2) + relativedelta(months=-1)).strftime('%Y-%m-01')
self.first_date = (datetime.date.today() + relativedelta(months=-2)).strftime('%Y-%m-01') self.first_date = (datetime.date.today() + relativedelta(days=-2) + relativedelta(months=-2)).strftime('%Y-%m-01')
self.current_month = (datetime.date.today() + datetime.timedelta(days=-1)).month self.third_month = (datetime.date.today() + relativedelta(days=-2)).month
self.second_month = (datetime.date.today() + relativedelta(months=-1)).month self.second_month = (datetime.date.today() + relativedelta(days=-2) + relativedelta(months=-1)).month
self.first_month = (datetime.date.today() + relativedelta(months=-2)).month self.first_month = (datetime.date.today() + relativedelta(days=-2) + relativedelta(months=-2)).month
self.num_day = len(pd.date_range(self.first_date, self.current_date)) self.num_day = len(pd.date_range(self.first_date, self.current_date))
...@@ -124,7 +125,7 @@ class ModelMonitorVLM: ...@@ -124,7 +125,7 @@ class ModelMonitorVLM:
return pd.read_sql(sql, self.mysql_engine) return pd.read_sql(sql, self.mysql_engine)
except: except:
print('SQL查询出现错误.') print('SQL查询出现错误.')
pass return None
def query_mongo(self, condition, fields): def query_mongo(self, condition, fields):
''' '''
...@@ -147,22 +148,18 @@ class ModelMonitorVLM: ...@@ -147,22 +148,18 @@ class ModelMonitorVLM:
:param field: str, 字段名. :param field: str, 字段名.
:return: df, 过滤后的数据. :return: df, 过滤后的数据.
''' '''
df = df[~((df['applied_type'] == 1) & (df['applied_channel'].apply(lambda x: 'Android' in x)))] df = df[~((df['applied_type'] == 1) & (df['applied_channel'].apply(lambda x: 'Android' in x)))]
field_idx = self.field_query_list.index(field) field_idx = self.field_query_list.index(field)
na_time = self.na_time[field_idx] na_time = self.na_time[field_idx]
na_type = self.na_app_type[field_idx] na_type = self.na_app_type[field_idx]
na_chan = self.na_app_chan[field_idx] na_chan = self.na_app_chan[field_idx]
print(na_chan, type(na_chan))
if pd.isnull(na_time): # 没有空跑时间, 则不记录. if pd.isnull(na_time): # 没有空跑时间, 则不记录.
return df return df
# 时间. # 时间.
t_s, t_e = na_time.split('~') t_s, t_e = na_time.split('~')
print(t_s, t_e)
if len(t_e) == 0: # 若还在空跑, 则不记录. if len(t_e) == 0: # 若还在空跑, 则不记录.
return pd.DataFrame() return pd.DataFrame()
else: else:
print(df['applied_at'].head())
na_df = df[ na_df = df[
(df['applied_at'].apply(lambda x: x[:10] >= t_s)) & (df['applied_at'].apply(lambda x: x[:10] <= t_e))] (df['applied_at'].apply(lambda x: x[:10] >= t_s)) & (df['applied_at'].apply(lambda x: x[:10] <= t_e))]
if na_df.shape[0] == 0: if na_df.shape[0] == 0:
...@@ -174,7 +171,6 @@ class ModelMonitorVLM: ...@@ -174,7 +171,6 @@ class ModelMonitorVLM:
tmp_df = pd.DataFrame() tmp_df = pd.DataFrame()
for i in str(int(na_type)): for i in str(int(na_type)):
print(i, 'wsnd')
tmp_df = tmp_df.append(na_df[na_df['applied_type'] == int(i)]) tmp_df = tmp_df.append(na_df[na_df['applied_type'] == int(i)])
na_df = tmp_df na_df = tmp_df
if na_df.shape[0] == 0: if na_df.shape[0] == 0:
...@@ -190,10 +186,8 @@ class ModelMonitorVLM: ...@@ -190,10 +186,8 @@ class ModelMonitorVLM:
if na_df.shape[0] == 0: if na_df.shape[0] == 0:
return df return df
print(df.shape[0], na_df.shape[0])
return df[~df.index.isin(na_df.index.values)] return df[~df.index.isin(na_df.index.values)]
def mk_test(self, x, alpha=0.01): def mk_test(self, x, alpha=0.01):
''' '''
MK test. MK test.
...@@ -257,8 +251,12 @@ class ModelMonitorVLM: ...@@ -257,8 +251,12 @@ class ModelMonitorVLM:
if data < 0 or data > 999999: if data < 0 or data > 999999:
return np.nan return np.nan
return data return data
print(df.head()) # 报错则放弃该变量.
try:
df = df[['applied_at', field]] df = df[['applied_at', field]]
except:
print('字段处理发生错误: %s' % self.field_query_name_dict[field])
return None
df[field] = df[field].apply(set_na) df[field] = df[field].apply(set_na)
# 计算该字段在每天的均值, 数量, 缺失率, 零率. # 计算该字段在每天的均值, 数量, 缺失率, 零率.
...@@ -332,7 +330,6 @@ class ModelMonitorVLM: ...@@ -332,7 +330,6 @@ class ModelMonitorVLM:
tmp_df = pd.DataFrame() tmp_df = pd.DataFrame()
for i in self.field_query_app_type_dict[field]: for i in self.field_query_app_type_dict[field]:
print(i)
tmp_df = tmp_df.append(df_copy[df_copy['applied_type'] == int(i)]) tmp_df = tmp_df.append(df_copy[df_copy['applied_type'] == int(i)])
df_copy = tmp_df df_copy = tmp_df
# 收集覆盖客群. # 收集覆盖客群.
...@@ -349,7 +346,6 @@ class ModelMonitorVLM: ...@@ -349,7 +346,6 @@ class ModelMonitorVLM:
user_group_dict[app_type_dict[app_type] + '-' + app_chan] = (app_type, app_chan) user_group_dict[app_type_dict[app_type] + '-' + app_chan] = (app_type, app_chan)
if df_copy_g.iloc[i][field] > int(self.min_user_group * self.num_day / 30): if df_copy_g.iloc[i][field] > int(self.min_user_group * self.num_day / 30):
main_user_group_dict[app_type_dict[app_type] + '-' + app_chan] = (app_type, app_chan) main_user_group_dict[app_type_dict[app_type] + '-' + app_chan] = (app_type, app_chan)
print(app_type, app_chan)
del df_copy_g del df_copy_g
# 过滤非覆盖数据. # 过滤非覆盖数据.
...@@ -359,6 +355,11 @@ class ModelMonitorVLM: ...@@ -359,6 +355,11 @@ class ModelMonitorVLM:
(df_copy['applied_type'] == user_group_dict[user_group_name][0]) & ( (df_copy['applied_type'] == user_group_dict[user_group_name][0]) & (
df_copy['applied_channel'] == user_group_dict[user_group_name][1])]) df_copy['applied_channel'] == user_group_dict[user_group_name][1])])
df_copy = tmp_df df_copy = tmp_df
# 报错: 无applied_type.
try:
df_copy['applied_type']
except:
return None
## 覆盖全样本. ## 覆盖全样本.
self.process_data_helper(group_name=self.field_query_name_dict[field] + '-全样本', df=df_copy, field=field) self.process_data_helper(group_name=self.field_query_name_dict[field] + '-全样本', df=df_copy, field=field)
...@@ -440,9 +441,10 @@ class ModelMonitorVLM: ...@@ -440,9 +441,10 @@ class ModelMonitorVLM:
linestyles='--') linestyles='--')
# 展示. # 展示.
plt.title(user_group_name + '-mean') plt.title(user_group_name + '-mean', fontdict={'fontsize': 40})
plt.grid() plt.grid()
plt.xticks([]) plt.xticks([])
plt.subplots_adjust(left=0.03, right=0.99, top=0.94, bottom=0.08)
# 分开保存. # 分开保存.
is_save = False is_save = False
...@@ -488,10 +490,35 @@ class ModelMonitorVLM: ...@@ -488,10 +490,35 @@ class ModelMonitorVLM:
def run(self): def run(self):
# 读取数据. # 读取数据.
if self.if_read: if self.if_read:
self.mysql_df = self.query_mysql('''SELECT order_no, applied_at, mysql_field = [x for i, x in enumerate(self.field_query_list) if self.field_DB_list[i] == 'MySQL']
applied_from, applied_channel, passdue_day real_mysql_field = []
FROM risk_analysis for field in mysql_field:
WHERE applied_at >= "%s 00:00:00" tmp_df = self.query_mysql('''SELECT %s FROM risk_analysis LIMIT 10''' % field)
if tmp_df is not None:
real_mysql_field.append(field)
print('在MySQL中找到该字段: %s' % self.field_query_name_dict[field])
else:
print('在MySQL中不存在该字段: %s' % self.field_query_name_dict[field])
# 删除该字段.
idx = self.field_query_list.index(field)
self.field_query_list.pop(idx)
self.field_DB_list.pop(idx)
self.field_app_type_list.pop(idx)
self.field_name_list.pop(idx)
del self.field_query_name_dict[field]
del self.field_query_app_type_dict[field]
if real_mysql_field:
self.mysql_df = self.query_mysql('''SELECT order_no, applied_at, \
applied_from, applied_channel, passdue_day, %s \
FROM risk_analysis \
WHERE applied_at >= "%s 00:00:00" \
AND applied_at <= "%s 00:00:00"'''
% (','.join(real_mysql_field), self.first_date, datetime.date.today().strftime('%Y-%m-%d')))
else:
self.mysql_df = self.query_mysql('''SELECT order_no, applied_at, \
applied_from, applied_channel, passdue_day \
FROM risk_analysis \
WHERE applied_at >= "%s 00:00:00" \
AND applied_at <= "%s 00:00:00"''' AND applied_at <= "%s 00:00:00"'''
% (self.first_date, datetime.date.today().strftime('%Y-%m-%d'))) % (self.first_date, datetime.date.today().strftime('%Y-%m-%d')))
if self.if_save: if self.if_save:
...@@ -509,7 +536,8 @@ class ModelMonitorVLM: ...@@ -509,7 +536,8 @@ class ModelMonitorVLM:
condition = {'wf_created_at': {'$gte': '%s 00:00:00' % self.first_date, condition = {'wf_created_at': {'$gte': '%s 00:00:00' % self.first_date,
'$lte': '%s 00:00:00' % datetime.date.today().strftime('%Y-%m-%d')}} '$lte': '%s 00:00:00' % datetime.date.today().strftime('%Y-%m-%d')}}
fields = {'wf_biz_no': 1, 'wf_created_at': 1, 'wf_loan_type': 1} fields = {'wf_biz_no': 1, 'wf_created_at': 1, 'wf_loan_type': 1}
for f in self.field_query_list: # 加入Excel中预置的模型分名称 mongo_field = [x for i, x in enumerate(self.field_query_list) if self.field_DB_list[i] == 'mongoDB']
for f in mongo_field: # 加入Excel中预置的模型分名称
fields[f] = 1 fields[f] = 1
self.mongo_df = self.query_mongo(condition, fields) self.mongo_df = self.query_mongo(condition, fields)
...@@ -521,6 +549,13 @@ class ModelMonitorVLM: ...@@ -521,6 +549,13 @@ class ModelMonitorVLM:
else: else:
self.mongo_df = pd.read_csv(self.data_save_path + 'mongo_data.csv') self.mongo_df = pd.read_csv(self.data_save_path + 'mongo_data.csv')
self.mongo_df = self.mongo_df.loc[self.mongo_df['applied_type'].notna()] self.mongo_df = self.mongo_df.loc[self.mongo_df['applied_type'].notna()]
def func_1(data):
try:
int(data)
return True
except:
return False
self.mongo_df = self.mongo_df.loc[self.mongo_df['applied_type'].apply(func_1)]
print('Mongo数据获取成功.') print('Mongo数据获取成功.')
...@@ -533,6 +568,7 @@ class ModelMonitorVLM: ...@@ -533,6 +568,7 @@ class ModelMonitorVLM:
self.merge_data = pd.merge(left=self.mysql_df, right=self.mongo_df, self.merge_data = pd.merge(left=self.mysql_df, right=self.mongo_df,
left_on='order_no', right_on='wf_biz_no', left_on='order_no', right_on='wf_biz_no',
how='inner') how='inner')
print('拼接数据成功') print('拼接数据成功')
# 清洗数据. # 清洗数据.
......
No preview for this file type
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment