Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
M
model_monitoring_monthly
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
decision-science
model_monitoring_monthly
Commits
c1985e9d
Commit
c1985e9d
authored
Sep 03, 2019
by
舒皓月
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
2019-09-03
parent
b4fb47f1
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
633 additions
and
2 deletions
+633
-2
PSI_time_filed_channel.py
PSI_time_filed_channel.py
+2
-2
VLM_time.py
VLM_time.py
+631
-0
model_score.xlsx
model_score.xlsx
+0
-0
No files found.
PSI.py
→
PSI
_time_filed_channel
.py
View file @
c1985e9d
...
@@ -28,6 +28,7 @@ class PSIMonitor:
...
@@ -28,6 +28,7 @@ class PSIMonitor:
min_user_group
=
500
,
max_psi
=
0.1
,
min_user_group
=
500
,
max_psi
=
0.1
,
if_save
=
True
,
if_read
=
True
,
if_save
=
True
,
if_read
=
True
,
date_list
=
(
'2019-03-01'
,
'2019-03-15'
,
'2019-03-31'
,
'2019-04-15'
),
date_list
=
(
'2019-03-01'
,
'2019-03-15'
,
'2019-03-31'
,
'2019-04-15'
),
interval_days
=
15
,
field_query
=
'model_exec_data_source#fst_v6_xy_br_dhb_raw'
,
field_query
=
'model_exec_data_source#fst_v6_xy_br_dhb_raw'
,
channel
=
'融360'
):
channel
=
'融360'
):
...
@@ -60,7 +61,7 @@ class PSIMonitor:
...
@@ -60,7 +61,7 @@ class PSIMonitor:
self
.
na_app_chan
=
self
.
field_info_df
.
na_app_chan
.
tolist
()
# 空跑渠道
self
.
na_app_chan
=
self
.
field_info_df
.
na_app_chan
.
tolist
()
# 空跑渠道
# 一些定义的常量
# 一些定义的常量
self
.
min_user_group
=
min_user_group
# 最小客群数量.
self
.
min_user_group
=
min_user_group
*
interval_days
/
30
# 最小客群数量.
self
.
max_psi
=
max_psi
# 最大PSI, 超过视为异常.
self
.
max_psi
=
max_psi
# 最大PSI, 超过视为异常.
# 数据规格
# 数据规格
...
@@ -260,7 +261,6 @@ class PSIMonitor:
...
@@ -260,7 +261,6 @@ class PSIMonitor:
print
(
'处理完成.'
)
print
(
'处理完成.'
)
print
(
'='
*
40
)
print
(
'='
*
40
)
def
plot_psi
(
self
,
field
):
def
plot_psi
(
self
,
field
):
# 分离数据.
# 分离数据.
...
...
VLM_time.py
0 → 100644
View file @
c1985e9d
# coding=utf-8
import
pandas
as
pd
import
numpy
as
np
import
matplotlib.pyplot
as
plt
import
os
import
pickle
import
datetime
from
dateutil.relativedelta
import
relativedelta
from
collections
import
OrderedDict
from
scipy.stats
import
norm
import
pymysql
import
pymongo
import
warnings
warnings
.
filterwarnings
(
'ignore'
)
plt
.
rcParams
[
'font.sans-serif'
]
=
[
'SimHei'
]
plt
.
rcParams
[
'axes.unicode_minus'
]
=
False
plt
.
rcParams
[
'savefig.dpi'
]
=
150
# TODO
# 分模型放图片.
# 信息里面添加模型名称, 申请类型, 渠道.
class
VLMMonitor
:
'''
定时段查看字段的VLM.
'''
def
__init__
(
self
,
excel_path
=
'./model_score.xlsx'
,
sheet_name
=
'model'
,
save_path
=
'./vlm/'
,
if_read
=
True
,
if_save
=
True
,
alpha
=
0.01
,
min_user_group
=
600
,
start_date
=
'2019-08-01'
,
end_date
=
'2019-08-14'
):
# 考虑到数据库配置基本不变, 所以不设置创建对象时对应输入变量.
self
.
mysql_engine
=
pymysql
.
connect
(
host
=
'172.20.6.9'
,
port
=
9030
,
user
=
'fengkong_read_only'
,
passwd
=
'mT2HFUgI'
,
db
=
'risk_analysis'
,
charset
=
'utf8'
)
self
.
mongo_client
=
pymongo
.
MongoClient
(
"mongodb://haoyue.shu:x2egwRHk7WhQ4So1@172.18.3.22:27017/?authSource=rc_mgo_feature_dp"
)
self
.
mongo_db
=
self
.
mongo_client
[
'rc_mgo_feature_dp'
]
self
.
mongo_table
=
self
.
mongo_db
[
'wf_audit_log_with_feature'
]
# 读取整理在Excel中的模型相关信息.
self
.
field_info_df
=
pd
.
read_excel
(
excel_path
,
sheet_name
=
sheet_name
)
self
.
field_name_list
=
self
.
field_info_df
.
field_name
.
tolist
()
self
.
field_query_list
=
self
.
field_info_df
.
field_query
.
tolist
()
self
.
field_app_type_list
=
self
.
field_info_df
.
app_type
.
tolist
()
self
.
field_app_type_list
=
[
str
(
x
)
for
x
in
self
.
field_app_type_list
]
self
.
field_DB_list
=
self
.
field_info_df
.
DB
.
tolist
()
self
.
field_query_name_dict
=
dict
(
zip
(
self
.
field_query_list
,
self
.
field_name_list
))
self
.
field_query_app_type_dict
=
dict
(
zip
(
self
.
field_query_list
,
self
.
field_app_type_list
))
## 空跑信息.
self
.
na_time
=
self
.
field_info_df
.
na_time
.
tolist
()
# 空跑时间段
self
.
na_app_type
=
self
.
field_info_df
.
na_app_type
.
tolist
()
# 空跑申请类型
self
.
na_app_chan
=
self
.
field_info_df
.
na_app_chan
.
tolist
()
# 空跑渠道
# 文件存储路径.
self
.
fig_save_path
=
save_path
+
'image/'
self
.
info_save_path
=
save_path
+
'info/'
self
.
data_save_path
=
save_path
+
'data/'
if
not
os
.
path
.
exists
(
save_path
):
os
.
mkdir
(
save_path
)
if
not
os
.
path
.
exists
(
self
.
fig_save_path
):
os
.
mkdir
(
self
.
fig_save_path
)
if
not
os
.
path
.
exists
(
self
.
fig_save_path
+
'image/'
):
os
.
mkdir
(
self
.
fig_save_path
+
'image/'
)
if
not
os
.
path
.
exists
(
self
.
fig_save_path
+
'trend/'
):
os
.
mkdir
(
self
.
fig_save_path
+
'trend'
)
if
not
os
.
path
.
exists
(
self
.
fig_save_path
+
'over3std/'
):
os
.
mkdir
(
self
.
fig_save_path
+
'over3std/'
)
if
not
os
.
path
.
exists
(
self
.
info_save_path
):
os
.
mkdir
(
self
.
info_save_path
)
if
not
os
.
path
.
exists
(
self
.
data_save_path
):
os
.
mkdir
(
self
.
data_save_path
)
# MK test显著性水平.
self
.
alpha
=
alpha
# 数据.
self
.
mysql_df
=
None
self
.
mongo_df
=
None
self
.
merge_data
=
None
# 数据模式.
self
.
if_read
=
if_read
self
.
if_save
=
if_save
# 时间范围.
self
.
start_date
=
start_date
self
.
end_date
=
(
pd
.
datetime
.
strptime
(
end_date
,
'
%
Y-
%
m-
%
d'
)
+
relativedelta
(
days
=
1
))
.
strftime
(
'
%
Y-
%
m-
%
d'
)
self
.
num_day
=
(
pd
.
datetime
.
strptime
(
self
.
end_date
,
'
%
Y-
%
m-
%
d'
)
-
pd
.
datetime
.
strptime
(
self
.
start_date
,
'
%
Y-
%
m-
%
d'
))
.
days
# 统计信息.
# {group_name_0: {time_list: [...],
# value_list: [...],
# count_list: [...],
# miss_rate_list: [...],
# zero_rate_list: [...],
# mean: float,
# std: float,
# trend: str,
# h: boll,
# p: float,
# over_3std: bool}
self
.
vlm_info_dict
=
OrderedDict
()
self
.
vlm_info_df
=
None
self
.
min_user_group
=
min_user_group
*
(
self
.
num_day
/
30
)
def
query_mysql
(
self
,
sql
):
'''
连接MySQL数据库, 根据SQL返回数据.
:param sql: str.
:return: DataFrame.
'''
try
:
return
pd
.
read_sql
(
sql
,
self
.
mysql_engine
)
except
:
print
(
'SQL查询出现错误.'
)
return
None
def
query_mongo
(
self
,
condition
,
fields
):
'''
连接MongoDB, 根据查询返回数据.
:param condition: dict
:param fields: dict
:return: DataFrame
'''
try
:
return
pd
.
DataFrame
(
list
(
self
.
mongo_table
.
find
(
condition
,
fields
)))
except
:
print
(
'Mongo查询出现错误.'
)
pass
def
filter_data
(
self
,
df
,
field
):
'''
过滤空跑数据.
:param df: df.
:param field: str, 字段名.
:return: df, 过滤后的数据.
'''
df
=
df
[
~
((
df
[
'applied_type'
]
==
1
)
&
(
df
[
'applied_channel'
]
.
apply
(
lambda
x
:
'Android'
in
x
)))]
field_idx
=
self
.
field_query_list
.
index
(
field
)
na_time
=
self
.
na_time
[
field_idx
]
na_type
=
self
.
na_app_type
[
field_idx
]
na_chan
=
self
.
na_app_chan
[
field_idx
]
if
pd
.
isnull
(
na_time
):
# 没有空跑时间, 则不记录.
return
df
# 时间.
t_s
,
t_e
=
na_time
.
split
(
'~'
)
if
len
(
t_e
)
==
0
:
# 若还在空跑, 则不记录.
return
pd
.
DataFrame
()
else
:
na_df
=
df
[
(
df
[
'applied_at'
]
.
apply
(
lambda
x
:
x
[:
10
]
>=
t_s
))
&
(
df
[
'applied_at'
]
.
apply
(
lambda
x
:
x
[:
10
]
<=
t_e
))]
if
na_df
.
shape
[
0
]
==
0
:
return
df
# 申请类型.
if
pd
.
isnull
(
na_type
):
return
df
[
~
df
.
index
.
isin
(
na_df
.
index
.
values
)]
else
:
tmp_df
=
pd
.
DataFrame
()
for
i
in
str
(
int
(
na_type
)):
tmp_df
=
tmp_df
.
append
(
na_df
[
na_df
[
'applied_type'
]
==
int
(
i
)])
na_df
=
tmp_df
if
na_df
.
shape
[
0
]
==
0
:
return
df
# 申请渠道.
if
pd
.
isnull
(
na_chan
):
return
df
[
~
df
.
index
.
isin
(
na_df
.
index
.
values
)]
else
:
tmp_df
=
pd
.
DataFrame
()
for
i
in
na_chan
.
split
(
','
):
tmp_df
=
tmp_df
.
append
(
na_df
[
na_df
[
'applied_channel'
]
.
apply
(
lambda
x
:
i
in
x
)])
na_df
=
tmp_df
if
na_df
.
shape
[
0
]
==
0
:
return
df
return
df
[
~
df
.
index
.
isin
(
na_df
.
index
.
values
)]
def
mk_test
(
self
,
x
,
alpha
=
0.01
):
'''
MK test.
用于检测时间序列趋势.
:param x: list[float]
:param alpha: float, 显著性水平.
:return: 趋势, 是否显著, P值, Z统计值.
'''
if
type
(
x
)
!=
list
:
x
=
x
.
values
.
tolist
()
n
=
len
(
x
)
# calculate S
s
=
0
for
k
in
range
(
n
-
1
):
for
j
in
range
(
k
+
1
,
n
):
s
+=
np
.
sign
(
x
[
j
]
-
x
[
k
])
# calculate the unique data
unique_x
=
np
.
unique
(
x
)
g
=
len
(
unique_x
)
# calculate the var(s)
if
n
==
g
:
# there is no tie
var_s
=
(
n
*
(
n
-
1
)
*
(
2
*
n
+
5
))
/
18
else
:
# there are some ties in data
tp
=
np
.
zeros
(
unique_x
.
shape
)
for
i
in
range
(
len
(
unique_x
)):
tp
[
i
]
=
sum
(
x
==
unique_x
[
i
])
var_s
=
(
n
*
(
n
-
1
)
*
(
2
*
n
+
5
)
-
np
.
sum
(
tp
*
(
tp
-
1
)
*
(
2
*
tp
+
5
)))
/
18
if
s
>
0
:
z
=
(
s
-
1
)
/
np
.
sqrt
(
var_s
)
elif
s
<
0
:
z
=
(
s
+
1
)
/
np
.
sqrt
(
var_s
)
else
:
# s == 0:
z
=
0
# calculate the p_value
p
=
2
*
(
1
-
norm
.
cdf
(
abs
(
z
)))
# two tail test
h
=
abs
(
z
)
>
norm
.
ppf
(
1
-
alpha
/
2
)
if
(
z
<
0
)
and
h
:
trend
=
'decreasing'
elif
(
z
>
0
)
and
h
:
trend
=
'increasing'
else
:
trend
=
'no trend'
return
trend
,
h
,
p
,
z
def
process_data_helper
(
self
,
group_name
=
None
,
df
=
None
,
field
=
None
):
print
(
'正在处理
%
s'
%
group_name
)
# 得到一个连续时间序列.
date_list
=
pd
.
date_range
(
self
.
start_date
,
self
.
end_date
)
.
strftime
(
'
%
Y-
%
m-
%
d'
)
.
values
.
tolist
()[:
-
1
]
# 将特殊值标定为缺失.
def
set_na
(
data
):
if
pd
.
isnull
(
data
):
return
np
.
nan
if
data
<
0
or
data
>
999999
:
return
np
.
nan
return
data
# 报错则放弃该变量.
try
:
df
=
df
[[
'applied_at'
,
field
]]
except
:
print
(
'字段处理发生错误:
%
s'
%
self
.
field_query_name_dict
[
field
])
return
None
df
[
field
]
=
df
[
field
]
.
apply
(
set_na
)
# 计算该字段在每天的均值, 数量, 缺失率, 零率.
def
count
(
data
):
return
len
(
data
)
def
miss_rate
(
data
):
return
data
.
isnull
()
.
mean
()
def
zero_rate
(
data
):
return
(
data
==
0
)
.
mean
()
df_g
=
df
.
groupby
(
'applied_at'
)
.
agg
({
field
:
[
'mean'
,
count
,
miss_rate
,
zero_rate
]})
df_g
.
columns
=
[
'_'
.
join
(
x
)
for
x
in
df_g
.
columns
.
ravel
()]
df_g
=
df_g
.
reset_index
()
# 将没有数据的当天均值补为0.
tmp_df
=
pd
.
DataFrame
({
'applied_at'
:
date_list
})
df_g
=
pd
.
merge
(
left
=
tmp_df
,
right
=
df_g
,
on
=
'applied_at'
,
how
=
'left'
)
df_g
=
df_g
.
fillna
(
0
)
# 计算该字段整体的均值.
mean
=
df
[
field
]
.
mean
()
# 计算该字段整体的标准差.
std
=
df_g
[
field
+
'_mean'
]
.
std
()
# 计算该字段的趋势信息.
trend
,
h
,
p
,
z
=
self
.
mk_test
(
df_g
[
field
+
'_mean'
],
alpha
=
self
.
alpha
)
# 整体.
# 整合信息.
def
model_name
(
data
):
return
data
.
split
(
'-'
)[
0
]
def
app_type
(
data
):
return
data
.
split
(
'-'
)[
1
]
def
app_channel
(
data
):
data
=
data
.
split
(
'-'
)[
2
:]
return
'-'
.
join
(
data
)
self
.
vlm_info_dict
[
group_name
]
=
{}
self
.
vlm_info_dict
[
group_name
][
'model_name'
]
=
model_name
(
group_name
)
self
.
vlm_info_dict
[
group_name
][
'app_type'
]
=
app_type
(
group_name
)
self
.
vlm_info_dict
[
group_name
][
'app_channel'
]
=
app_channel
(
group_name
)
self
.
vlm_info_dict
[
group_name
][
'time_list'
]
=
date_list
self
.
vlm_info_dict
[
group_name
][
'value_list'
]
=
df_g
[
field
+
'_mean'
]
.
values
.
tolist
()
self
.
vlm_info_dict
[
group_name
][
'count_list'
]
=
df_g
[
field
+
'_count'
]
.
values
.
tolist
()
self
.
vlm_info_dict
[
group_name
][
'miss_rate_list'
]
=
df_g
[
field
+
'_miss_rate'
]
.
values
.
tolist
()
self
.
vlm_info_dict
[
group_name
][
'zero_rate_list'
]
=
df_g
[
field
+
'_zero_rate'
]
.
values
.
tolist
()
self
.
vlm_info_dict
[
group_name
][
'mean'
]
=
mean
self
.
vlm_info_dict
[
group_name
][
'std'
]
=
std
self
.
vlm_info_dict
[
group_name
][
'trend'
]
=
trend
self
.
vlm_info_dict
[
group_name
][
'h'
]
=
h
self
.
vlm_info_dict
[
group_name
][
'p'
]
=
p
if
any
([
x
>
mean
+
3
*
std
or
x
<
mean
-
3
*
std
for
x
in
self
.
vlm_info_dict
[
group_name
][
'value_list'
]]):
self
.
vlm_info_dict
[
group_name
][
'over_3std'
]
=
True
else
:
self
.
vlm_info_dict
[
group_name
][
'over_3std'
]
=
False
def
process_data
(
self
,
field
):
'''
对每个字段对应的不同客群进行数据处理.
:param field: str, 需要处理的字段.
:return: .
'''
# 过滤空跑数据.
df_copy
=
self
.
merge_data
[[
'applied_at'
,
'applied_type'
,
'applied_channel'
,
field
]]
.
copy
()
df_copy
=
self
.
filter_data
(
df_copy
,
field
)
if
df_copy
.
shape
[
0
]
==
0
:
print
(
'
%
s还在空跑.'
%
self
.
field_query_name_dict
[
field
])
return
None
# 申请类型过滤.
tmp_df
=
pd
.
DataFrame
()
for
i
in
self
.
field_query_app_type_dict
[
field
]:
tmp_df
=
tmp_df
.
append
(
df_copy
[
df_copy
[
'applied_type'
]
==
int
(
i
)])
df_copy
=
tmp_df
# 收集覆盖客群.
user_group_dict
=
{}
# user_group_dict = {'首申-融360': (1, 融360)}
main_user_group_dict
=
{}
app_type_dict
=
{
1
:
'首申'
,
2
:
'复申'
,
3
:
'复贷'
}
df_copy_g
=
df_copy
.
groupby
([
'applied_type'
,
'applied_channel'
])[
field
]
.
count
()
.
sort_values
(
ascending
=
False
)
df_copy_g
=
df_copy_g
.
reset_index
()
df_copy_g
=
df_copy_g
.
loc
[
df_copy_g
[
field
]
>
self
.
min_user_group
/
6
]
# 太小的客群直接过滤.
for
i
in
range
(
df_copy_g
.
shape
[
0
]):
app_type
=
df_copy_g
.
iloc
[
i
][
'applied_type'
]
app_chan
=
df_copy_g
.
iloc
[
i
][
'applied_channel'
]
user_group_dict
[
app_type_dict
[
app_type
]
+
'-'
+
app_chan
]
=
(
app_type
,
app_chan
)
if
df_copy_g
.
iloc
[
i
][
field
]
>
int
(
self
.
min_user_group
):
main_user_group_dict
[
app_type_dict
[
app_type
]
+
'-'
+
app_chan
]
=
(
app_type
,
app_chan
)
del
df_copy_g
# 过滤非覆盖数据.
tmp_df
=
pd
.
DataFrame
()
for
user_group_name
in
user_group_dict
:
tmp_df
=
tmp_df
.
append
(
df_copy
.
loc
[
(
df_copy
[
'applied_type'
]
==
user_group_dict
[
user_group_name
][
0
])
&
(
df_copy
[
'applied_channel'
]
==
user_group_dict
[
user_group_name
][
1
])])
df_copy
=
tmp_df
# 报错: 无applied_type.
try
:
df_copy
[
'applied_type'
]
except
:
return
None
## 覆盖全样本.
self
.
process_data_helper
(
group_name
=
self
.
field_query_name_dict
[
field
]
+
'-全样本'
,
df
=
df_copy
,
field
=
field
)
## 首申/复申/复贷 若覆盖的话.
if
df_copy
.
loc
[
df_copy
[
'applied_type'
]
==
1
]
.
shape
[
0
]
>
int
(
self
.
min_user_group
):
self
.
process_data_helper
(
group_name
=
self
.
field_query_name_dict
[
field
]
+
'-首申'
,
df
=
df_copy
.
loc
[
df_copy
[
'applied_type'
]
==
1
],
field
=
field
)
if
df_copy
.
loc
[
df_copy
[
'applied_type'
]
==
2
]
.
shape
[
0
]
>
int
(
self
.
min_user_group
):
self
.
process_data_helper
(
group_name
=
self
.
field_query_name_dict
[
field
]
+
'-复申'
,
df
=
df_copy
.
loc
[
df_copy
[
'applied_type'
]
==
2
],
field
=
field
)
if
df_copy
.
loc
[
df_copy
[
'applied_type'
]
==
3
]
.
shape
[
0
]
>
int
(
self
.
min_user_group
):
self
.
process_data_helper
(
group_name
=
self
.
field_query_name_dict
[
field
]
+
'-复贷'
,
df
=
df_copy
.
loc
[
df_copy
[
'applied_type'
]
==
3
],
field
=
field
)
## 按划分的客群处理数据.
for
user_group_name
in
main_user_group_dict
:
self
.
process_data_helper
(
group_name
=
self
.
field_query_name_dict
[
field
]
+
'-'
+
user_group_name
,
df
=
df_copy
.
loc
[
(
df_copy
[
'applied_type'
]
==
main_user_group_dict
[
user_group_name
][
0
])
&
(
df_copy
[
'applied_channel'
]
==
main_user_group_dict
[
user_group_name
][
1
])],
field
=
field
)
def
plot
(
self
):
'''
根据vlm_info_dict进行绘图.
'''
for
user_group_name
in
self
.
vlm_info_dict
:
print
(
user_group_name
)
plt
.
figure
(
figsize
=
(
30
,
15
))
# 画主线.
plt
.
plot
(
range
(
self
.
num_day
),
self
.
vlm_info_dict
[
user_group_name
][
'value_list'
])
# 打印趋势信息.
plt
.
text
(
x
=
0.9
,
y
=
0.75
,
s
=
'整体趋势:
%
s
\n
'
%
self
.
vlm_info_dict
[
user_group_name
][
'trend'
],
fontsize
=
15
,
transform
=
plt
.
gca
()
.
transAxes
)
# 画均值, 标准差.
plt
.
hlines
(
y
=
self
.
vlm_info_dict
[
user_group_name
][
'mean'
],
xmin
=
0
,
xmax
=
self
.
num_day
-
1
,
colors
=
'k'
,
linestyles
=
'--'
)
plt
.
hlines
(
y
=
self
.
vlm_info_dict
[
user_group_name
][
'mean'
]
+
3
*
self
.
vlm_info_dict
[
user_group_name
][
'std'
],
xmin
=
0
,
xmax
=
self
.
num_day
-
1
,
colors
=
'r'
,
linestyles
=
'--'
)
plt
.
hlines
(
y
=
self
.
vlm_info_dict
[
user_group_name
][
'mean'
]
-
3
*
self
.
vlm_info_dict
[
user_group_name
][
'std'
],
xmin
=
0
,
xmax
=
self
.
num_day
-
1
,
colors
=
'r'
,
linestyles
=
'--'
)
# 画表格.
cell_text
=
[[
str
(
round
(
x
,
3
))
for
x
in
self
.
vlm_info_dict
[
user_group_name
][
'value_list'
]],
[
str
(
int
(
x
))
for
x
in
self
.
vlm_info_dict
[
user_group_name
][
'count_list'
]],
[
str
(
100
*
x
)[:
4
]
+
'
%
'
for
x
in
self
.
vlm_info_dict
[
user_group_name
][
'miss_rate_list'
]],
[
str
(
100
*
x
)[:
4
]
+
'
%
'
for
x
in
self
.
vlm_info_dict
[
user_group_name
][
'zero_rate_list'
]]]
rows
=
[
'value'
,
'count'
,
'miss_rate'
,
'zero_rate'
]
cols
=
[
x
[
5
:]
for
x
in
self
.
vlm_info_dict
[
user_group_name
][
'time_list'
]]
plt
.
table
(
cellText
=
cell_text
,
rowLabels
=
rows
,
colLabels
=
cols
,
colWidths
=
[
0.91
/
(
self
.
num_day
-
1
)]
*
self
.
num_day
,
loc
=
'bottom'
)
plt
.
subplots_adjust
(
left
=
.1
,
bottom
=
.15
)
# 画曲线下阴影线.
plt
.
vlines
(
x
=
range
(
self
.
num_day
),
ymin
=
[
self
.
vlm_info_dict
[
user_group_name
][
'mean'
]
-
3.5
*
self
.
vlm_info_dict
[
user_group_name
][
'std'
]]
*
self
.
num_day
,
ymax
=
self
.
vlm_info_dict
[
user_group_name
][
'value_list'
],
colors
=
'lightgrey'
,
linestyles
=
'--'
)
# 展示.
plt
.
title
(
user_group_name
+
'-mean'
,
fontdict
=
{
'fontsize'
:
40
})
plt
.
grid
()
plt
.
xticks
([])
plt
.
subplots_adjust
(
left
=
0.03
,
right
=
0.99
,
top
=
0.94
,
bottom
=
0.08
)
# 分开保存.
is_save
=
False
if
self
.
vlm_info_dict
[
user_group_name
][
'h'
]
==
True
:
plt
.
savefig
(
self
.
fig_save_path
+
'trend/'
+
user_group_name
+
'-mean'
)
is_save
=
True
if
self
.
vlm_info_dict
[
user_group_name
][
'over_3std'
]
==
True
:
plt
.
savefig
(
self
.
fig_save_path
+
'over3std/'
+
user_group_name
+
'-mean'
)
is_save
=
True
if
not
is_save
:
plt
.
savefig
(
self
.
fig_save_path
+
'image/'
+
user_group_name
+
'-mean'
)
plt
.
show
()
def
save_vlm_info
(
self
):
self
.
vlm_info_df
=
pd
.
DataFrame
(
columns
=
[
'model_name'
,
'app_type'
,
'app_channel'
,
'group_name'
,
'trend'
,
'h'
,
'p'
,
'over_3std'
])
for
g
in
self
.
vlm_info_dict
:
tmp_df
=
pd
.
DataFrame
({
'model_name'
:
[
self
.
vlm_info_dict
[
g
][
'model_name'
]],
'app_type'
:
[
self
.
vlm_info_dict
[
g
][
'app_type'
]],
'app_channel'
:
[
self
.
vlm_info_dict
[
g
][
'app_channel'
]],
'group_name'
:
[
g
],
'trend'
:
[
self
.
vlm_info_dict
[
g
][
'trend'
]],
'h'
:
[
self
.
vlm_info_dict
[
g
][
'h'
]],
'p'
:
[
self
.
vlm_info_dict
[
g
][
'p'
]],
'over_3std'
:
[
self
.
vlm_info_dict
[
g
][
'over_3std'
]]
})
self
.
vlm_info_df
=
self
.
vlm_info_df
.
append
(
tmp_df
)
self
.
vlm_info_df
.
to_csv
(
self
.
info_save_path
+
'vlm_info.csv'
,
index
=
False
)
with
open
(
self
.
info_save_path
+
'vlm_info.dict'
,
'wb'
)
as
f
:
pickle
.
dump
(
self
.
vlm_info_dict
,
f
)
f
.
close
()
def
run
(
self
):
# 读取数据.
if
self
.
if_read
:
mysql_field
=
[
x
for
i
,
x
in
enumerate
(
self
.
field_query_list
)
if
self
.
field_DB_list
[
i
]
==
'MySQL'
]
real_mysql_field
=
[]
for
field
in
mysql_field
:
tmp_df
=
self
.
query_mysql
(
'''SELECT
%
s FROM risk_analysis LIMIT 10'''
%
field
)
if
tmp_df
is
not
None
:
real_mysql_field
.
append
(
field
)
print
(
'在MySQL中找到该字段:
%
s'
%
self
.
field_query_name_dict
[
field
])
else
:
print
(
'在MySQL中不存在该字段:
%
s'
%
self
.
field_query_name_dict
[
field
])
# 删除该字段.
idx
=
self
.
field_query_list
.
index
(
field
)
self
.
field_query_list
.
pop
(
idx
)
self
.
field_DB_list
.
pop
(
idx
)
self
.
field_app_type_list
.
pop
(
idx
)
self
.
field_name_list
.
pop
(
idx
)
del
self
.
field_query_name_dict
[
field
]
del
self
.
field_query_app_type_dict
[
field
]
if
real_mysql_field
:
self
.
mysql_df
=
self
.
query_mysql
(
'''SELECT order_no, applied_at,
\
applied_from, applied_channel, passdue_day,
%
s
\
FROM risk_analysis
\
WHERE applied_at >= "
%
s 00:00:00"
\
AND applied_at <= "
%
s 00:00:00"'''
%
(
','
.
join
(
real_mysql_field
),
self
.
start_date
,
self
.
end_date
))
else
:
self
.
mysql_df
=
self
.
query_mysql
(
'''SELECT order_no, applied_at,
\
applied_from, applied_channel, passdue_day
\
FROM risk_analysis
\
WHERE applied_at >= "
%
s 00:00:00"
\
AND applied_at <= "
%
s 00:00:00"'''
%
(
self
.
start_date
,
self
.
end_date
))
if
self
.
if_save
:
self
.
mysql_df
.
to_csv
(
self
.
data_save_path
+
'mysql_data.csv'
,
index
=
False
)
else
:
self
.
mysql_df
=
pd
.
read_csv
(
self
.
data_save_path
+
'mysql_data.csv'
)
print
(
'MySQL数据获取成功.'
)
def
func_0
(
data
):
try
:
return
int
(
int
(
data
)
+
1
)
except
:
return
np
.
nan
if
self
.
if_read
:
condition
=
{
'wf_created_at'
:
{
'$gte'
:
'
%
s 00:00:00'
%
self
.
start_date
,
'$lte'
:
'
%
s 00:00:00'
%
self
.
end_date
}}
fields
=
{
'wf_biz_no'
:
1
,
'wf_created_at'
:
1
,
'wf_loan_type'
:
1
}
mongo_field
=
[
x
for
i
,
x
in
enumerate
(
self
.
field_query_list
)
if
self
.
field_DB_list
[
i
]
==
'mongoDB'
]
for
f
in
mongo_field
:
# 加入Excel中预置的模型分名称
fields
[
f
]
=
1
self
.
mongo_df
=
self
.
query_mongo
(
condition
,
fields
)
self
.
mongo_df
[
'applied_type'
]
=
self
.
mongo_df
[
'wf_loan_type'
]
.
apply
(
func_0
)
del
self
.
mongo_df
[
'wf_loan_type'
]
if
self
.
if_save
:
self
.
mongo_df
.
to_csv
(
self
.
data_save_path
+
'mongo_data.csv'
,
index
=
False
)
else
:
self
.
mongo_df
=
pd
.
read_csv
(
self
.
data_save_path
+
'mongo_data.csv'
)
self
.
mongo_df
=
self
.
mongo_df
.
loc
[
self
.
mongo_df
[
'applied_type'
]
.
notna
()]
def
func_1
(
data
):
try
:
int
(
data
)
return
True
except
:
return
False
self
.
mongo_df
=
self
.
mongo_df
.
loc
[
self
.
mongo_df
[
'applied_type'
]
.
apply
(
func_1
)]
print
(
'Mongo数据获取成功.'
)
# MySQL数据去重.
self
.
mysql_df
=
self
.
mysql_df
.
sort_values
(
'passdue_day'
)
self
.
mysql_df
=
self
.
mysql_df
.
drop_duplicates
(
'order_no'
,
keep
=
'first'
)
print
(
'数据去重完成.'
)
# 拼接数据.
self
.
merge_data
=
pd
.
merge
(
left
=
self
.
mysql_df
,
right
=
self
.
mongo_df
,
left_on
=
'order_no'
,
right_on
=
'wf_biz_no'
,
how
=
'inner'
)
print
(
'拼接数据成功'
)
# 清洗数据.
def
clean_data
(
data
):
try
:
return
float
(
data
)
except
:
return
np
.
nan
na_field_list
=
[]
for
field
in
self
.
field_query_list
:
if
field
in
self
.
merge_data
.
columns
.
tolist
():
print
(
'正在清洗
%
s'
%
self
.
field_query_name_dict
[
field
])
self
.
merge_data
[
field
]
=
self
.
merge_data
[
field
]
.
apply
(
clean_data
)
else
:
na_field_list
.
append
(
field
)
# 清洗时间格式, 使其转换成统一的字符串格式.
if
repr
(
self
.
merge_data
[
'applied_at'
]
.
dtype
)
==
"dtype('O')"
:
self
.
merge_data
[
'applied_at'
]
=
self
.
merge_data
[
'applied_at'
]
.
apply
(
lambda
x
:
x
[:
10
])
else
:
self
.
merge_data
[
'applied_at'
]
=
self
.
merge_data
[
'applied_at'
]
.
apply
(
lambda
x
:
x
.
strftime
(
'
%
Y-
%
m-
%
d'
))
# 确认数据时间范围.
self
.
merge_data
=
self
.
merge_data
.
loc
[
(
self
.
merge_data
[
'applied_at'
]
>=
self
.
start_date
)
&
(
self
.
merge_data
[
'applied_at'
]
<=
self
.
end_date
)]
# 去除因为一些原因未抽取到的字段.
print
(
'不包含以下字段:'
)
for
field
in
na_field_list
:
print
(
self
.
field_query_name_dict
[
field
])
self
.
field_query_list
.
remove
(
field
)
self
.
field_name_list
.
remove
(
self
.
field_query_name_dict
[
field
])
del
self
.
field_query_name_dict
[
field
]
# 处理数据.
print
(
'开始处理数据.'
)
print
(
'='
*
40
)
for
field
in
self
.
field_query_list
:
self
.
process_data
(
field
)
print
(
'数据处理完毕.'
)
print
(
'='
*
40
)
# 画图.
print
(
'开始画图.'
)
print
(
'='
*
40
)
self
.
plot
()
print
(
'画图完成.'
)
print
(
'='
*
40
)
# 保存信息.
print
(
'开始保存信息.'
)
print
(
'='
*
40
)
self
.
save_vlm_info
()
print
(
'保存信息完成.'
)
print
(
'='
*
40
)
if
__name__
==
'__main__'
:
pass
model_score.xlsx
View file @
c1985e9d
No preview for this file type
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment