Commit 1c84d297 authored by 舒皓月's avatar 舒皓月

20191209_0

parent c851dc48
This diff is collapsed.
......@@ -35,13 +35,6 @@ class PSIMonitor:
# 考虑到数据库配置基本不变, 所以不设置创建对象时对应输入变量.
self.mysql_engine = pymysql.connect(host='172.20.6.9',
port=9030,
user='fengkong_read_only',
passwd='mT2HFUgI',
db='risk_analysis',
charset='utf8')
self.mongo_client = pymongo.MongoClient(
"mongodb://haoyue.shu:x2egwRHk7WhQ4So1@172.18.3.22:27017/?authSource=rc_mgo_feature_dp")
self.mongo_db = self.mongo_client['rc_mgo_feature_dp']
......@@ -51,7 +44,6 @@ class PSIMonitor:
self.field_info_df = pd.read_excel(excel_path, sheet_name=sheet_name)
self.field_name_list = self.field_info_df.field_name.tolist()
self.field_query_list = self.field_info_df.field_query.tolist()
self.field_DB_list = self.field_info_df.DB.tolist()
self.field_query_name_dict = dict(zip(self.field_query_list, self.field_name_list))
# 一些定义的常量
......@@ -62,9 +54,7 @@ class PSIMonitor:
self.date_list = date_list
# 将会从数据库中读取的数据.
self.mysql_df = None
self.mongo_df = None
self.merge_data = None
# 统计数据记录.
psi_cols = ['field_query', 'group_name']
......@@ -91,17 +81,6 @@ class PSIMonitor:
if not os.path.exists(self.save_path + 'info/'):
os.mkdir(self.save_path + 'info/')
def query_mysql(self, sql):
'''
连接MySQL数据库, 根据SQL返回数据.
:param sql: str.
:return: DataFrame.
'''
try:
return pd.read_sql(sql, self.mysql_engine)
except:
print('SQL查询出现错误.')
def query_mongo(self, condition, fields):
'''
连接MongoDB, 根据查询返回数据.
......@@ -209,7 +188,7 @@ class PSIMonitor:
def plot_psi(self, field):
# 分离数据.
df_copy = self.merge_data[[field, 'date_label', 'applied_type', 'applied_channel', 'applied_at']].copy()
df_copy = self.mongo_df[[field, 'date_label', 'applied_type', 'applied_channel', 'applied_at']].copy()
df_copy = df_copy[df_copy[field].notna()]
if df_copy.shape[0] == 0:
print('仍在空跑.')
......@@ -322,43 +301,6 @@ class PSIMonitor:
self.psi_info_df['is_abnormal'] = self.psi_info_df.apply(is_abnormal_psi, axis=1)
def run(self):
# 获取MySQL数据.
if self.if_read:
mysql_field = [x for i, x in enumerate(self.field_query_list) if self.field_DB_list[i] == 'MySQL']
real_mysql_field = []
for field in mysql_field:
tmp_df = self.query_mysql('''SELECT %s FROM risk_analysis LIMIT 10''' % field)
if tmp_df is not None:
real_mysql_field.append(field)
print('在MySQL中找到该字段: %s' % self.field_query_name_dict[field])
else:
print('在MySQL中不存在该字段: %s' % self.field_query_name_dict[field])
# 删除该字段.
idx = self.field_query_list.index(field)
self.field_query_list.pop(idx)
self.field_DB_list.pop(idx)
self.field_name_list.pop(idx)
del self.field_query_name_dict[field]
if len(real_mysql_field) > 0:
self.mysql_df = self.query_mysql('''SELECT order_no, applied_at, %s, \
applied_from, applied_channel, transacted, passdue_day \
FROM risk_analysis \
WHERE applied_at >= "%s 00:00:00" \
AND applied_at <= "%s 00:00:00"'''
% (','.join(real_mysql_field), self.date_list[0], self.date_list[-1]))
else:
self.mysql_df = self.query_mysql('''SELECT order_no, applied_at, \
applied_from, applied_channel, transacted, passdue_day \
FROM risk_analysis \
WHERE applied_at >= "%s 00:00:00" \
AND applied_at <= "%s 00:00:00"'''
% (self.date_list[0], self.date_list[-1]))
print('MySQL数据获取成功.')
if self.if_save:
self.mysql_df.to_csv(self.save_path + 'data/mysql_data.csv', index=False)
else:
self.mysql_df = pd.read_csv(self.save_path + 'data/mysql_data.csv')
def func_0(data):
try:
return int(int(data) + 1)
......@@ -369,34 +311,35 @@ class PSIMonitor:
if self.if_read:
condition = {'wf_created_at': {'$gte': '%s 00:00:00' % self.date_list[0],
'$lte': '%s 00:00:00' % self.date_list[-1]}}
fields = {'wf_biz_no': 1, 'wf_created_at': 1, 'wf_loan_type': 1}
fields = {'wf_biz_no': 1, 'wf_created_at': 1, 'wf_loan_type': 1,
'passdue_day': 1, 'wf_biz_channel': 1, 'applied_channel_cn': 1}
for f in self.field_query_list: # 加入Excel中预置的模型分名称
fields[f] = 1
self.mongo_df = self.query_mongo(condition, fields)
self.mongo_df['applied_type'] = self.mongo_df['wf_loan_type'].apply(func_0)
self.mongo_df = self.mongo_df.loc[self.mongo_df['applied_type'].notna()]
self.mongo_df['applied_at'] = self.mongo_df['wf_created_at']
self.mongo_df['applied_from'] = self.mongo_df['wf_biz_channel']
self.mongo_df['applied_channel'] = self.mongo_df['applied_channel_cn']
del self.mongo_df['wf_loan_type']
del self.mongo_df['wf_created_at']
del self.mongo_df['wf_biz_channel']
del self.mongo_df['applied_channel_cn']
self.mongo_df = self.mongo_df.loc[self.mongo_df['applied_type'].notna()]
print('MongoDB数据获取成功.')
if self.if_save:
self.mongo_df.to_csv(self.save_path + 'data/mongo_data.csv', index=False)
else:
self.mongo_df = pd.read_csv(self.save_path + 'data/mongo_data.csv')
# MySQL数据去重.
self.mysql_df = self.mysql_df.sort_values('passdue_day')
self.mysql_df = self.mysql_df.drop_duplicates('order_no', keep='first')
print('数据去重完成.')
# 拼接数据.
self.merge_data = pd.merge(left=self.mysql_df, right=self.mongo_df,
left_on='order_no', right_on='wf_biz_no', how='inner')
print('数据拼接完成.')
# 清洗时间格式, 使其转换成统一的字符串格式.
if repr(self.merge_data['applied_at'].dtype) == "dtype('O')":
self.merge_data['applied_at'] = self.merge_data['applied_at'].apply(lambda x: x[:10])
if repr(self.mongo_df['applied_at'].dtype) == "dtype('O')":
self.mongo_df['applied_at'] = self.mongo_df['applied_at'].apply(lambda x: x[:10])
else:
self.merge_data['applied_at'] = self.merge_data['applied_at'].apply(lambda x: x.strftime('%Y-%m-%d'))
self.mongo_df['applied_at'] = self.mongo_df['applied_at'].apply(lambda x: x.strftime('%Y-%m-%d'))
# 清洗数据.
def clean_data(data):
......@@ -407,9 +350,9 @@ class PSIMonitor:
na_field_list = []
for field in self.field_query_list:
if field in self.merge_data.columns.tolist():
if field in self.mongo_df.columns.tolist():
print('正在清洗%s' % self.field_query_name_dict[field])
self.merge_data[field] = self.merge_data[field].apply(clean_data)
self.mongo_df[field] = self.mongo_df[field].apply(clean_data)
else:
na_field_list.append(field)
## 去除因为一些原因未抽取到的字段.
......@@ -418,16 +361,15 @@ class PSIMonitor:
print(self.field_query_name_dict[field])
idx = self.field_query_list.index(field)
self.field_query_list.pop(idx)
self.field_DB_list.pop(idx)
self.field_name_list.pop(idx)
del self.field_query_name_dict[field]
# 数据按时间划分.
self.merge_data['date_label'] = ''
self.mongo_df['date_label'] = ''
for i in range(len(self.date_list) - 1):
self.merge_data.loc[
(self.merge_data['applied_at'] >= '%s' % self.date_list[i]) &
(self.merge_data['applied_at'] < '%s' % self.date_list[i + 1]),
self.mongo_df.loc[
(self.mongo_df['applied_at'] >= '%s' % self.date_list[i]) &
(self.mongo_df['applied_at'] < '%s' % self.date_list[i + 1]),
'date_label'] = self.date_list[i] + ' ~ ' + self.date_list[i + 1]
# 画图.
......
......@@ -38,12 +38,6 @@ class VLMMonitor:
start_date='2019-08-01', end_date='2019-08-14'):
# 考虑到数据库配置基本不变, 所以不设置创建对象时对应输入变量.
self.mysql_engine = pymysql.connect(host='172.20.6.9',
port=9030,
user='fengkong_read_only',
passwd='mT2HFUgI',
db='risk_analysis',
charset='utf8')
self.mongo_client = pymongo.MongoClient(
"mongodb://haoyue.shu:x2egwRHk7WhQ4So1@172.18.3.22:27017/?authSource=rc_mgo_feature_dp")
......@@ -54,7 +48,6 @@ class VLMMonitor:
self.field_info_df = pd.read_excel(excel_path, sheet_name=sheet_name)
self.field_name_list = self.field_info_df.field_name.tolist()
self.field_query_list = self.field_info_df.field_query.tolist()
self.field_DB_list = self.field_info_df.DB.tolist()
self.field_query_name_dict = dict(zip(self.field_query_list, self.field_name_list))
# 文件存储路径.
......@@ -80,9 +73,7 @@ class VLMMonitor:
self.alpha = alpha
# 数据.
self.mysql_df = None
self.mongo_df = None
self.merge_data = None
# 数据模式.
self.if_read = if_read
......@@ -110,18 +101,6 @@ class VLMMonitor:
self.min_user_group = min_user_group * (self.num_day / 30)
def query_mysql(self, sql):
'''
连接MySQL数据库, 根据SQL返回数据.
:param sql: str.
:return: DataFrame.
'''
try:
return pd.read_sql(sql, self.mysql_engine)
except:
print('SQL查询出现错误.')
return None
def query_mongo(self, condition, fields):
'''
连接MongoDB, 根据查询返回数据.
......@@ -272,7 +251,7 @@ class VLMMonitor:
:return: .
'''
# 过滤空跑数据.
df_copy = self.merge_data[['applied_at', 'applied_type', 'applied_channel', field]].copy()
df_copy = self.mongo_df[['applied_at', 'applied_type', 'applied_channel', field]].copy()
if df_copy.shape[0] == 0:
print('%s还在空跑.' % self.field_query_name_dict[field])
return None
......@@ -425,43 +404,6 @@ class VLMMonitor:
f.close()
def run(self):
# 读取数据.
if self.if_read:
mysql_field = [x for i, x in enumerate(self.field_query_list) if self.field_DB_list[i] == 'MySQL']
real_mysql_field = []
for field in mysql_field:
tmp_df = self.query_mysql('''SELECT %s FROM risk_analysis LIMIT 10''' % field)
if tmp_df is not None:
real_mysql_field.append(field)
print('在MySQL中找到该字段: %s' % self.field_query_name_dict[field])
else:
print('在MySQL中不存在该字段: %s' % self.field_query_name_dict[field])
# 删除该字段.
idx = self.field_query_list.index(field)
self.field_query_list.pop(idx)
self.field_DB_list.pop(idx)
self.field_name_list.pop(idx)
del self.field_query_name_dict[field]
if real_mysql_field:
self.mysql_df = self.query_mysql('''SELECT order_no, applied_at, \
applied_from, applied_channel, passdue_day, %s \
FROM risk_analysis \
WHERE applied_at >= "%s 00:00:00" \
AND applied_at <= "%s 00:00:00"'''
% (','.join(real_mysql_field), self.start_date, self.end_date))
else:
self.mysql_df = self.query_mysql('''SELECT order_no, applied_at, \
applied_from, applied_channel, passdue_day \
FROM risk_analysis \
WHERE applied_at >= "%s 00:00:00" \
AND applied_at <= "%s 00:00:00"'''
% (self.start_date, self.end_date))
if self.if_save:
self.mysql_df.to_csv(self.data_save_path + 'mysql_data.csv', index=False)
else:
self.mysql_df = pd.read_csv(self.data_save_path + 'mysql_data.csv')
print('MySQL数据获取成功.')
def func_0(data):
try:
return int(int(data) + 1)
......@@ -471,15 +413,25 @@ class VLMMonitor:
if self.if_read:
condition = {'wf_created_at': {'$gte': '%s 00:00:00' % self.start_date,
'$lte': '%s 00:00:00' % self.end_date}}
fields = {'wf_biz_no': 1, 'wf_created_at': 1, 'wf_loan_type': 1}
fields = {'wf_biz_no': 1, 'wf_created_at': 1, 'wf_loan_type': 1,
'passdue_day': 1, 'wf_biz_channel': 1, 'applied_channel_cn': 1}
mongo_field = [x for i, x in enumerate(self.field_query_list) if self.field_DB_list[i] == 'mongoDB']
for f in mongo_field: # 加入Excel中预置的模型分名称
fields[f] = 1
self.mongo_df = self.query_mongo(condition, fields)
self.mongo_df['applied_type'] = self.mongo_df['wf_loan_type'].apply(func_0)
self.mongo_df['applied_at'] = self.mongo_df['wf_created_at']
self.mongo_df['applied_from'] = self.mongo_df['wf_biz_channel']
self.mongo_df['applied_channel'] = self.mongo_df['applied_channel_cn']
del self.mongo_df['wf_loan_type']
del self.mongo_df['wf_created_at']
del self.mongo_df['wf_biz_channel']
del self.mongo_df['applied_channel_cn']
self.mongo_df = self.mongo_df.loc[self.mongo_df['applied_type'].notna()]
if self.if_save:
self.mongo_df.to_csv(self.data_save_path + 'mongo_data.csv', index=False)
else:
......@@ -497,18 +449,6 @@ class VLMMonitor:
print('Mongo数据获取成功.')
# MySQL数据去重.
self.mysql_df = self.mysql_df.sort_values('passdue_day')
self.mysql_df = self.mysql_df.drop_duplicates('order_no', keep='first')
print('数据去重完成.')
# 拼接数据.
self.merge_data = pd.merge(left=self.mysql_df, right=self.mongo_df,
left_on='order_no', right_on='wf_biz_no',
how='inner')
print('拼接数据成功')
# 清洗数据.
def clean_data(data):
try:
......@@ -518,20 +458,20 @@ class VLMMonitor:
na_field_list = []
for field in self.field_query_list:
if field in self.merge_data.columns.tolist():
if field in self.mongo_df.columns.tolist():
print('正在清洗%s' % self.field_query_name_dict[field])
self.merge_data[field] = self.merge_data[field].apply(clean_data)
self.mongo_df[field] = self.mongo_df[field].apply(clean_data)
else:
na_field_list.append(field)
# 清洗时间格式, 使其转换成统一的字符串格式.
if repr(self.merge_data['applied_at'].dtype) == "dtype('O')":
self.merge_data['applied_at'] = self.merge_data['applied_at'].apply(lambda x: x[:10])
if repr(self.mongo_df['applied_at'].dtype) == "dtype('O')":
self.mongo_df['applied_at'] = self.mongo_df['applied_at'].apply(lambda x: x[:10])
else:
self.merge_data['applied_at'] = self.merge_data['applied_at'].apply(lambda x: x.strftime('%Y-%m-%d'))
self.mongo_df['applied_at'] = self.mongo_df['applied_at'].apply(lambda x: x.strftime('%Y-%m-%d'))
# 确认数据时间范围.
self.merge_data = self.merge_data.loc[
(self.merge_data['applied_at'] >= self.start_date) & (self.merge_data['applied_at'] <= self.end_date)]
self.mongo_df = self.mongo_df.loc[
(self.mongo_df['applied_at'] >= self.start_date) & (self.mongo_df['applied_at'] <= self.end_date)]
# 去除因为一些原因未抽取到的字段.
print('不包含以下字段:')
......@@ -539,7 +479,6 @@ class VLMMonitor:
print(self.field_query_name_dict[field])
idx = self.field_query_list.index(field)
self.field_query_list.pop(idx)
self.field_DB_list.pop(idx)
self.field_name_list.pop(idx)
del self.field_query_name_dict[field]
......
......@@ -3,9 +3,15 @@
自动写无人分析版的月监控报告.
'''
import numpy as np
import pandas as pd
from docx import Document
from docx.shared import Cm
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT
from docx.enum.table import WD_TABLE_ALIGNMENT
from datetime import datetime
import os
import warnings
......@@ -20,55 +26,194 @@ class AutoReportor:
3. 按照一定规则汇总信息, 写总结.
4. 分别对每个模型进行展示.
'''
def __init__(self):
def __init__(self, excel_path='./model_score.xlsx', sheet_name='model',
is_pdf=False):
# 创建空文档.
self.doc = Document()
# 标题.
self.doc.add_heading('模型监控报告', level=0)
# 监控模型.
self.field_info_df = pd.read_excel(excel_path, sheet_name=sheet_name)
self.field_name_list = self.field_info_df.field_name.tolist()
self.field_query_list = self.field_info_df.field_query.tolist()
self.field_query_name_dict = dict(zip(self.field_query_list, self.field_name_list))
# 信息保存路径.
self.auc_path = './auc/'
self.vlm_path = './vlm/'
self.psi_path = './psi/'
# 读取info.
self.auc_info_df = pd.read_csv(self.auc_path + 'info/auc_info.csv')
self.psi_info_df = pd.read_csv(self.psi_path + 'info/psi_info.csv')
self.vlm_info_df = pd.read_csv(self.vlm_path + 'info/vlm_info.csv')
self.psi_info_df['name'] = self.psi_info_df.apply(lambda x: x['field_query'] + '-' + x['group_name'], axis=1)
# 读取图片名称列表.
self.psi_image_name_list = os.listdir(self.psi_path + 'image/')
self.psi_image_name_list = [x for x in self.psi_image_name_list if x.endswith('.png')]
self.vlm_image_name_list = os.listdir(self.vlm_path + 'image/image/')
self.vlm_image_name_list = [x for x in self.vlm_image_name_list if x.endswith('.png')]
self.vlm_image_name_over3std_list = os.listdir(self.vlm_path + 'image/over3std/')
self.vlm_image_name_over3std_list = [x for x in self.vlm_image_name_over3std_list if x.endswith('.png')]
self.vlm_image_name_trend_list = os.listdir(self.vlm_path + 'image/trend/')
self.vlm_image_name_trend_list = [x for x in self.vlm_image_name_trend_list if x.endswith('.png')]
self.is_pdf = is_pdf
def get_image(self, image_name, type_='psi'):
if type_ == 'psi':
return self.psi_path + 'image/' + image_name + '.png'
elif type_ == 'vlm':
res_name = image_name + '-mean.png'
if res_name in self.vlm_image_name_list:
return self.vlm_path + 'image/image/' + res_name
elif res_name in self.vlm_image_name_over3std_list:
return self.vlm_path + 'image/over3std/' + res_name
else:
return self.vlm_path + 'image/trend/' + res_name
else:
print('图片种类输入错误.')
def summary(self):
pass
def show_detail(self, name):
self.doc.add_heading(name, level=1)
# AUC
self.doc.add_paragraph('模型区分度(AUC)', style='List Bullet')
col_list = [x for x in self.auc_info_df.columns.tolist() if 'NUM' in x]
recent_col = max(col_list)
def get_info(self):
tmp_df = self.auc_info_df.loc[self.auc_info_df['model_name'] == name]
tmp_df = tmp_df.sort_values(recent_col, ascending=False)
pass
tmp_auc = tmp_df[['group_name'] + [x for x in tmp_df.columns if 'AUC' in x]]
tmp_auc.columns = [x[: -3] if 'AUC' in x else x for x in tmp_auc.columns]
n_row = tmp_auc.shape[0]
n_col = tmp_auc.shape[1]
if n_row > 15:
tmp_auc = tmp_auc.iloc[: 15]
n_row = 15
table = self.doc.add_table(rows=n_row + 1, cols=n_col, style='Table Grid')
def filter_info(self):
hdr_cells = table.rows[0].cells
for i, col in enumerate(tmp_auc.columns.tolist()):
hdr_cells[i].text = col
for row in range(n_row):
row_cells = table.rows[row + 1].cells
for col in range(n_col):
row_cells[col].text = str(tmp_auc.iloc[row, col])
pass
# PSI
self.doc.add_paragraph('模型稳定性(PSI)', style='List Bullet')
def summary(self):
tmp_df = self.psi_info_df.loc[self.psi_info_df['field_query'] == name]
tmp_df = tmp_df.loc[(self.psi_info_df['field_query'] == name) &
(self.psi_info_df['is_abnormal'])]
pass
n_row = tmp_df.shape[0]
if n_row > 5:
tmp_df = tmp_df.iloc[: 5]
n_row = 5
def show_detail(self):
paragraph = self.doc.add_paragraph()
paragraph.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
run = paragraph.add_run('')
pass
if not any(['全样本' in x for x in tmp_df['group_name'].tolist()]):
run.add_picture(self.get_image(name + '-全样本'), width=Cm(10))
for i in range(n_row):
run.add_picture(self.get_image(tmp_df.iloc[i, -1]), width=Cm(10))
# VLM
self.doc.add_paragraph('模型均值变化(VLM)', style='List Bullet')
def format(self):
tmp_df = self.vlm_info_df.loc[self.vlm_info_df['model_name'] == name]
over3std_df = tmp_df.loc[tmp_df['over_3std']]
trend_df = tmp_df.loc[tmp_df['h']].sort_values('p')
pass
self.doc.add_paragraph('整体', style='List Bullet 2')
paragraph = self.doc.add_paragraph()
paragraph.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
run = paragraph.add_run('')
run.add_picture(self.get_image(name + '-全样本'), width=Cm(10))
def run(self):
# 获取vlm, psi, auc信息.
self.doc.add_paragraph('波动', style='List Bullet 2')
# 提取重要信息.
# 初始化doc.
n_row = over3std_df.shape[0]
if n_row > 5:
over3std_df = over3std_df.iloc[: 5]
n_row = 5
# 根据规则写总结.
paragraph = self.doc.add_paragraph()
paragraph.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
run = paragraph.add_run('')
for i in range(n_row):
run.add_picture(self.get_image(over3std_df.iloc[i, 3]), width=Cm(10))
self.doc.add_paragraph('趋势', style='List Bullet 2')
n_row = trend_df.shape[0]
if n_row > 5:
trend_df = trend_df.iloc[: 5]
n_row = 5
# 写每个模型的信息.
paragraph = self.doc.add_paragraph()
paragraph.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
run = paragraph.add_run('')
for i in range(n_row):
run.add_picture(self.get_image(trend_df.iloc[i, 3]), width=Cm(10))
# 统一格式.
def format(self):
pass
# 保存.
@staticmethod
def doc2pdf(doc_path):
from win32com.client import constants, gencache
pdfPath = doc_path[: -4] + 'pdf'
word = gencache.EnsureDispatch('Word.Application')
doc = word.Documents.Open(doc_path, ReadOnly=1)
doc.ExportAsFixedFormat(pdfPath,
constants.wdExportFormatPDF,
Item=constants.wdExportDocumentWithMarkup,
CreateBookmarks=constants.wdExportCreateHeadingBookmarks)
word.Quit(constants.wdDoNotSaveChanges)
def run(self):
# 根据规则写总结.
pass
# 写每个模型的信息.
for name in self.field_name_list:
print('正在处理 ' + name)
self.show_detail(name)
# 统一格式.
self.format()
# 保存.
today = datetime.today()
cur_year = today.year
cur_month = today.month
self.doc.save('MM_report_%d%d01.docx' % (cur_year, cur_month))
if self.is_pdf:
cur_path = os.getcwd()
self.doc2pdf(cur_path + '/MM_report_%d%d01.docx' % (cur_year, cur_month))
print('报告生成完毕, 保存成功.')
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment